Explore o feed de alterações no Azure Cosmos DB

Concluído

O feed de alterações no Azure Cosmos DB é um registro persistente de alterações em um contêiner na ordem em que ocorrem. O suporte do feed de alterações no Azure Cosmos DB funciona através da escuta de alterações num contentor do Azure Cosmos DB. Em seguida, disponibiliza a lista ordenada dos documentos que foram alterados, pela ordem pela qual foram modificados. As alterações persistentes podem ser processadas de forma assíncrona e incremental, e a saída pode ser distribuída entre um ou mais consumidores para processamento paralelo.

Alterar alimentação e operações diferentes

Hoje, você vê todas as inserções e atualizações no feed de alterações. Não é possível filtrar o feed de alterações para um tipo específico de operação. Atualmente, o feed de alterações não registra operações de exclusão. Como solução alternativa, você pode adicionar um marcador suave nos itens que estão sendo excluídos. Por exemplo, você pode adicionar um atributo no item chamado "excluído", definir seu valor como "true" e, em seguida, definir um valor TTL (time-to-live) no item. Definir o TTL garante que o item seja excluído automaticamente.

Leitura do feed de alterações do Azure Cosmos DB

Você pode trabalhar com o feed de alterações do Azure Cosmos DB usando um modelo push ou um modelo pull. Com um modelo push, o processador de alimentação de alterações envia o trabalho para um cliente que tem lógica de negócios para processar esse trabalho. No entanto, a complexidade na verificação do trabalho e no estado de armazenamento do último trabalho processado é tratada dentro do processador de alimentação de alterações.

Com um modelo pull, o cliente tem que puxar o trabalho do servidor. Nesse caso, o cliente tem lógica de negócios para processar o trabalho e também armazena o estado do último trabalho processado. O cliente lida com o balanceamento de carga entre vários clientes, processando o trabalho em paralelo e manipulando erros.

Nota

Recomenda-se usar o modelo push porque você não precisará se preocupar em sondar o feed de alterações para alterações futuras, armazenar o estado da última alteração processada e outros benefícios.

A maioria dos cenários que usam o feed de alterações do Azure Cosmos DB usa uma das opções do modelo de push. No entanto, há alguns cenários em que você pode querer o controle de nível extra baixo do modelo pull. O controlo de nível extra baixo inclui:

  • Alterações de leitura de uma chave de partição específica
  • Controlar o ritmo a que o seu cliente recebe alterações para processamento
  • Fazer uma leitura única dos dados existentes no feed de alterações (por exemplo, para fazer uma migração de dados)

Ler o feed de alterações com um modelo push

Há duas maneiras de ler o feed de alterações com um modelo de push: os gatilhos do Azure Cosmos DB do Azure Functions e a biblioteca do processador do feed de alterações. O Azure Functions usa o processador de feed de alterações nos bastidores, portanto, essas são maneiras semelhantes de ler o feed de alterações. Pense no Azure Functions como simplesmente uma plataforma de hospedagem para o processador de feed de alterações, não uma maneira totalmente diferente de ler o feed de alterações. O Azure Functions usa o processador de feed de alterações nos bastidores. Ele paraleliza automaticamente o processamento de alterações nas partições do seu contêiner.

Funções do Azure

Você pode criar pequenas Funções do Azure reativas que são acionadas automaticamente em cada novo evento no feed de alterações do contêiner do Azure Cosmos DB. Com o gatilho do Azure Functions para o Azure Cosmos DB, você pode usar a funcionalidade de dimensionamento e deteção de eventos confiável do Change Feed Processor sem a necessidade de manter qualquer infraestrutura de trabalho.

Diagrama mostrando o feed de alterações que aciona o Azure Functions para processamento.

Processador do feed de alterações

O processador de feed de alterações faz parte dos SDKs do Azure Cosmos DB .NET V3 e Java V4 . Ele simplifica o processo de leitura do feed de alterações e distribui o processamento de eventos entre vários consumidores de forma eficaz.

Existem quatro componentes principais da implementação do processador do feed de alterações:

  1. O contêiner monitorado: o contêiner monitorado tem os dados a partir dos quais a alimentação de alteração é gerada. Quaisquer inserções e atualizações ao contentor monitorizado serão refletidas no feed de alterações do contentor.

  2. O contêiner de locação: o contêiner de concessão atua como um armazenamento de estado e coordena o processamento da alimentação de alterações entre vários trabalhadores. Pode armazenar o contentor de concessão na mesma conta que o contentor monitorizado ou numa conta separada.

  3. A instância de computação: uma instância de computação hospeda o processador de feed de alterações para escutar as alterações. Dependendo da plataforma, ele pode ser representado por uma VM, um pod kubernetes, uma instância do Serviço de Aplicativo do Azure, uma máquina física real. Ele tem um identificador exclusivo referenciado como o nome da instância ao longo deste artigo.

  4. O delegado: o delegado é o código que define o que você, o desenvolvedor, deseja fazer com cada lote de alterações que o processador de feed de alterações lê.

Ao implementar o processador de alimentação de alterações, o ponto de entrada é sempre o contêiner monitorado, a partir de uma Container instância que você chama GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Onde o primeiro parâmetro é um nome distinto que descreve o objetivo deste processador e o segundo nome é a implementação delegada que lida com alterações. Segue-se um exemplo de um delegado:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Depois, você define o nome da instância de computação ou identificador exclusivo com WithInstanceName, isso deve ser exclusivo e diferente em cada instância de computação que você está implantando e, finalmente, qual é o contêiner para manter o estado de concessão com WithLeaseContainer.

A chamada Build fornece a instância do processador que você pode iniciar chamando StartAsync.

O ciclo de vida normal de uma instância do anfitrião é:

  1. Leitura do feed de alterações.
  2. Se não houver alterações, durma por um período de tempo predefinido (personalizável com WithPollInterval no ) e vá para # Builder1.
  3. Se existirem alterações, envie-as para o delegado.
  4. Quando o delegado terminar o processamento das alterações com êxito, atualize o arquivo de concessões com o último ponto anterior no tempo processado e avance para o número 1.