Editar

Partilhar via


Otimização do desempenho – Transmissão em fluxo de eventos

Azure Functions
Azure IoT Hub
Azure Cosmos DB

Este artigo descreve como uma equipa de desenvolvimento utilizou métricas para encontrar estrangulamentos e melhorar o desempenho de um sistema distribuído. O artigo baseia-se em testes de carga reais que fizemos para uma aplicação de exemplo.

Este artigo faz parte de uma série. Leia a primeira parte aqui.

Cenário: processe um fluxo de eventos com Funções do Azure.

Diagrama de uma arquitetura de transmissão em fluxo de eventos

Neste cenário, uma frota de drones envia dados de posição em tempo real para Hub IoT do Azure. Uma aplicação de Funções recebe os eventos, transforma os dados em formato GeoJSON e escreve os dados transformados no Azure Cosmos DB. O Azure Cosmos DB tem suporte nativo para dados geoespaciais e as coleções do Azure Cosmos DB podem ser indexadas para consultas espaciais eficientes. Por exemplo, uma aplicação cliente pode consultar todos os drones num raio de 1 km de uma determinada localização ou localizar todos os drones numa determinada área.

Estes requisitos de processamento são suficientemente simples para que não necessitem de um motor de processamento de fluxo completo. Em particular, o processamento não associa fluxos, agrega dados ou processa em intervalos de tempo. Com base nestes requisitos, Funções do Azure é uma boa opção para processar as mensagens. O Azure Cosmos DB também pode ser dimensionado para suportar um débito de escrita muito elevado.

Monitorizar débito

Este cenário apresenta um desafio de desempenho interessante. A taxa de dados por dispositivo é conhecida, mas o número de dispositivos pode flutuar. Para este cenário empresarial, os requisitos de latência não são particularmente rigorosos. A posição comunicada de um drone só tem de ser precisa dentro de um minuto. Dito isto, a aplicação de funções tem de acompanhar a taxa média de ingestão ao longo do tempo.

Hub IoT armazena mensagens num fluxo de registos. As mensagens recebidas são anexadas à cauda do fluxo. Um leitor da transmissão em fluxo , neste caso, a aplicação de funções, controla a sua própria taxa de passagem pelo fluxo. Esta desacoplação dos caminhos de leitura e escrita torna Hub IoT muito eficiente, mas também significa que um leitor lento pode ficar para trás. Para detetar esta condição, a equipa de desenvolvimento adicionou uma métrica personalizada para medir o atraso da mensagem. Esta métrica regista o delta entre quando uma mensagem chega ao Hub IoT e quando a função recebe a mensagem para processamento.

var ticksUTCNow = DateTimeOffset.UtcNow;

// Track whether messages are arriving at the function late.
DateTime? firstMsgEnqueuedTicksUtc = messages[0]?.EnqueuedTimeUtc;
if (firstMsgEnqueuedTicksUtc.HasValue)
{
    CustomTelemetry.TrackMetric(
                        context,
                        "IoTHubMessagesReceivedFreshnessMsec",
                        (ticksUTCNow - firstMsgEnqueuedTicksUtc.Value).TotalMilliseconds);
}

O TrackMetric método escreve uma métrica personalizada no Application Insights. Para obter informações sobre como utilizar TrackMetric dentro de uma Função do Azure, veja Telemetria personalizada na função C#.

Se a função acompanhar o volume de mensagens, esta métrica deverá manter-se num estado estável baixo. Alguma latência é inevitável, pelo que o valor nunca será zero. No entanto, se a função ficar para trás, o delta entre o tempo enqueutado e o tempo de processamento começará a aumentar.

Teste 1: Linha de Base

O primeiro teste de carga mostrou um problema imediato: a aplicação function recebeu consistentemente erros HTTP 429 do Azure Cosmos DB, indicando que o Azure Cosmos DB estava a limitar os pedidos de escrita.

Gráfico de pedidos limitados do Azure Cosmos DB

Em resposta, a equipa dimensionou o Azure Cosmos DB ao aumentar o número de RUs alocadas para a coleção, mas os erros continuaram. Isto parecia estranho, porque o cálculo da parte de trás do envelope mostrou que o Azure Cosmos DB não deveria ter problemas em acompanhar o volume de pedidos de escrita.

Mais tarde nesse dia, um dos programadores enviou o seguinte e-mail à equipa:

Olhei para o Azure Cosmos DB para o caminho quente. Há uma coisa que não compreendo. A chave de partição é deliveryId, mas não enviamos deliveryId para o Azure Cosmos DB. Estou a perder alguma coisa?

Esta foi a pista. Olhando para o mapa térmico da partição, descobriu-se que todos os documentos estavam a aterrar na mesma partição.

Gráfico do mapa térmico da partição do Azure Cosmos DB

O que pretende ver no mapa térmico é uma distribuição uniforme em todas as partições. Neste caso, porque cada documento estava a ser escrito na mesma partição, a adição de RUs não ajudou. O problema acabou por ser um erro no código. Embora a coleção do Azure Cosmos DB tivesse uma chave de partição, a Função do Azure não incluía a chave de partição no documento. Para obter mais informações sobre o mapa térmico da partição, veja Determine the throughput distribution across partitions (Determinar a distribuição de débito entre partições).

Teste 2: Corrigir o problema de criação de partições

Quando a equipa implementou uma correção de código e executou novamente o teste, o Azure Cosmos DB parou a limitação. Por um tempo, tudo parecia bem. No entanto, numa determinada carga, a telemetria mostrou que a função estava a escrever menos documentos que deveria. O gráfico seguinte mostra as mensagens recebidas de Hub IoT versus documentos escritos no Azure Cosmos DB. A linha amarela é o número de mensagens recebidas por lote e o verde é o número de documentos escritos por lote. Estes devem ser proporcionais. Em vez disso, o número de operações de escrita de bases de dados por lote diminui significativamente em cerca de 07:30.

Gráfico de mensagens removidas

O gráfico seguinte mostra a latência entre quando uma mensagem chega ao Hub IoT a partir de um dispositivo e quando a aplicação de funções processa essa mensagem. Pode ver que, ao mesmo tempo, a latência aumenta drasticamente, nivela e diminui.

Gráfico de latência da mensagem

A razão pela qual o valor atinge um pico de 5 minutos e, em seguida, cai para zero é porque a aplicação de funções elimina mensagens com mais de 5 minutos de atraso:

foreach (var message in messages)
{
    // Drop stale messages,
    if (message.EnqueuedTimeUtc < cutoffTime)
    {
        log.Info($"Dropping late message batch. Enqueued time = {message.EnqueuedTimeUtc}, Cutoff = {cutoffTime}");
        droppedMessages++;
        continue;
    }
}

Pode vê-lo no gráfico quando a métrica de latência cai para zero. Entretanto, os dados foram perdidos porque a função estava a deitar fora mensagens.

O que estava a acontecer? Para este teste de carga específico, a coleção do Azure Cosmos DB tinha RUs para poupar, pelo que o estrangulamento não estava na base de dados. Em vez disso, o problema estava no ciclo de processamento de mensagens. Simplificando, a função não estava a escrever documentos com rapidez suficiente para acompanhar o volume de mensagens recebido. Com o tempo, ficou cada vez mais para trás.

Teste 3: Escritas paralelas

Se o tempo para processar uma mensagem for o estrangulamento, uma solução é processar mais mensagens em paralelo. Neste cenário:

  • Aumente o número de partições Hub IoT. A cada Hub IoT partição é atribuída uma instância de função de cada vez, pelo que seria de esperar que o débito dimensionasse linearmente com o número de partições.
  • Paralelizar as escritas do documento na função.

Para explorar a segunda opção, a equipa modificou a função para suportar escritas paralelas. A versão original da função utilizou o enlace de saída do Azure Cosmos DB. A versão otimizada chama diretamente o cliente do Azure Cosmos DB e executa as escritas em paralelo com Task.WhenAll:

private async Task<(long documentsUpserted,
                    long droppedMessages,
                    long cosmosDbTotalMilliseconds)>
                ProcessMessagesFromEventHub(
                    int taskCount,
                    int numberOfDocumentsToUpsertPerTask,
                    EventData[] messages,
                    TraceWriter log)
{
    DateTimeOffset cutoffTime = DateTimeOffset.UtcNow.AddMinutes(-5);

    var tasks = new List<Task>();

    for (var i = 0; i < taskCount; i++)
    {
        var docsToUpsert = messages
                            .Skip(i * numberOfDocumentsToUpsertPerTask)
                            .Take(numberOfDocumentsToUpsertPerTask);
        // client will attempt to create connections to the data
        // nodes on Azure Cosmos DB clusters on a range of port numbers
        tasks.Add(UpsertDocuments(i, docsToUpsert, cutoffTime, log));
    }

    await Task.WhenAll(tasks);

    return (this.UpsertedDocuments,
            this.DroppedMessages,
            this.CosmosDbTotalMilliseconds);
}

Tenha em atenção que as condições de corrida são possíveis com a abordagem. Suponha que duas mensagens do mesmo drone chegam no mesmo lote de mensagens. Ao escrevê-las em paralelo, a mensagem anterior pode substituir a mensagem posterior. Para este cenário específico, a aplicação pode tolerar perder uma mensagem ocasional. Os drones enviam novos dados de posição a cada 5 segundos, para que os dados no Azure Cosmos DB sejam atualizados continuamente. Noutros cenários, no entanto, pode ser importante processar as mensagens estritamente por ordem.

Depois de implementar esta alteração de código, a aplicação conseguiu ingerir mais de 2500 pedidos/seg, utilizando um Hub IoT com 32 partições.

Considerações do lado do cliente

A experiência geral do cliente pode ser diminuída pela paralelização agressiva do lado do servidor. Considere utilizar a biblioteca do executor em massa do Azure Cosmos DB (não apresentada nesta implementação), o que reduz significativamente os recursos de computação do lado do cliente necessários para saturar o débito alocado a um contentor do Azure Cosmos DB. Uma única aplicação por thread que escreve dados com a API de importação em massa obtém quase dez vezes mais débito de escrita em comparação com uma aplicação com vários threads que escreve dados em paralelo ao saturar a CPU do computador cliente.

Resumo

Para este cenário, foram identificados os seguintes estrangulamentos:

  • Partição de escrita frequente, devido a um valor de chave de partição em falta nos documentos que estão a ser escritos.
  • Escrever documentos em série por Hub IoT partição.

Para diagnosticar estes problemas, a equipa de desenvolvimento baseou-se nas seguintes métricas:

  • Pedidos limitados no Azure Cosmos DB.
  • Mapa térmico da partição – RUs consumidas máximas por partição.
  • Mensagens recebidas versus documentos criados.
  • Latência da mensagem.

Passos seguintes

Rever anti-padrões de desempenho