Partilhar via


Processador do feed de alterações no Azure Cosmos DB

APLICA-SE A: NoSQL

O processador de feed de alterações faz parte dos SDKs do Azure Cosmos DB .NET V3 e Java V4 . Ele simplifica o processo de leitura do feed de alterações e distribui o processamento de eventos entre vários consumidores de forma eficaz.

O principal benefício de usar o processador de alimentação de alterações é seu design tolerante a falhas, que garante uma entrega "pelo menos uma vez" de todos os eventos no feed de alterações.

SDKs suportados

.Net V3 Java Node.JS Python

Componentes do processador de alimentação de alterações

O processador de alimentação de alterações tem quatro componentes principais:

  • O contêiner monitorado: o contêiner monitorado tem os dados a partir dos quais a alimentação de alteração é gerada. Quaisquer inserções e atualizações ao contentor monitorizado serão refletidas no feed de alterações do contentor.

  • O contêiner de locação: o contêiner de locação atua como armazenamento de estado e coordena o processamento da alimentação de alterações entre vários trabalhadores. Pode armazenar o contentor de concessão na mesma conta que o contentor monitorizado ou numa conta separada.

  • A instância de computação: uma instância de computação hospeda o processador de feed de alterações para escutar as alterações. Dependendo da plataforma, ela pode ser representada por uma máquina virtual (VM), um pod do Kubernetes, uma instância do Serviço de Aplicativo do Azure ou uma máquina física real. A instância de computação tem um identificador exclusivo que é chamado de nome da instância ao longo deste artigo.

  • O delegado: o delegado é o código que define o que você, o desenvolvedor, deseja fazer com cada lote de alterações que o processador de feed de alterações lê.

Para entender melhor como esses quatro elementos do processador de alimentação de alterações funcionam juntos, vejamos um exemplo no diagrama a seguir. O contêiner monitorado armazena itens e usa 'City' como a chave de partição. Os valores da chave de partição são distribuídos em intervalos (cada intervalo representa uma partição física) que contêm itens.

O diagrama mostra duas instâncias de computação e o processador de alimentação de alterações atribui intervalos diferentes a cada instância para maximizar a distribuição de computação. Cada instância tem um nome diferente e exclusivo.

Cada intervalo é lido em paralelo. O progresso de um intervalo é mantido separadamente de outros intervalos no contêiner de concessão por meio de um documento de concessão . A combinação das concessões representa o estado atual do processador de alimentação de alterações.

Exemplo de processador de alimentação de alterações

Implementar o processador de alimentação de alterações

O processador de feed de alterações no .NET está disponível para o modo de versão mais recente e para todas as versões e modo de exclusão. Todas as versões e exclusões do modo estão em visualização e são suportados para o processador de feed de alterações a partir da versão 3.40.0-preview.0. O ponto de entrada para ambos os modos é sempre o contêiner monitorado.

Para ler usando o modo de versão mais recente, em uma Container instância, você chama GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Para ler usando todas as versões e o modo de exclusão, chame GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes a partir da Container instância:

Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Para ambos os modos, o primeiro parâmetro é um nome distinto que descreve o objetivo deste processador. O segundo nome é a implementação delegada que lida com alterações.

Aqui está um exemplo de um delegado para o modo de versão mais recente:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Aqui está um exemplo de um delegado para todas as versões e modo de exclusão:

static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ChangeFeedItem<ToDoItem> item in changes)
    {
        if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Previous.id}.");
        }
        else
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
        }
        // Simulate work
        await Task.Delay(1);
    }
}

Depois, você define o nome da instância de computação ou o identificador exclusivo usando WithInstanceName. O nome da instância de computação deve ser exclusivo e diferente para cada instância de computação que você está implantando. Você define o contêiner para manter o estado de concessão usando WithLeaseContainer.

A chamada Build fornece a instância do processador que você pode iniciar chamando StartAsync.

Nota

Os trechos de código anteriores são retirados de amostras no GitHub. Você pode obter a amostra para o modo de versão mais recente ou todas as versões e exclui o modo.

Ciclo de vida do processamento

O ciclo de vida normal de uma instância do anfitrião é:

  1. Leitura do feed de alterações.
  2. Se não houver alterações, suspenda por um período de tempo predefinido (personalizável usando WithPollInterval no Builder) e vá para #1.
  3. Se houver alterações, envie-as ao delegado.
  4. Quando o delegado terminar de processar as alterações com êxito, atualize o repositório de concessão com o último point-in-time processado e vá para #1.

Processamento de erros

O processador de alimentação de alterações é resiliente a erros de código do usuário. Se sua implementação de representante tiver uma exceção não tratada (etapa #4), o thread que está processando esse lote específico de alterações será interrompido e um novo thread será eventualmente criado. O novo thread verifica o último ponto no tempo que o repositório de concessão salvou para esse intervalo de valores de chave de partição. O novo thread reinicia a partir daí, enviando efetivamente o mesmo lote de alterações para o delegado. Esse comportamento continua até que o delegado processe as alterações corretamente, e é a razão pela qual o processador de feed de alterações tem uma garantia de "pelo menos uma vez".

Nota

Em apenas um cenário, um lote de alterações não é repetido. Se a falha ocorrer na primeira execução de delegado, o repositório de concessão não terá nenhum estado salvo anterior para ser usado na nova tentativa. Nesses casos, a nova tentativa usa a configuração inicial inicial, que pode ou não incluir o último lote.

Para evitar que o processador de feed de alterações fique "preso" continuamente repetindo o mesmo lote de alterações, você deve adicionar lógica no código do delegado para escrever documentos, mediante exceção, em uma fila de mensagens com erro. Esse design garante que você possa acompanhar as alterações não processadas enquanto ainda pode continuar a processar alterações futuras. A fila de mensagens com erro pode ser outro contêiner do Azure Cosmos DB. O armazenamento exato de dados não importa. Você simplesmente quer que as alterações não processadas sejam persistentes.

Você também pode usar o estimador de feed de alterações para monitorar o progresso das instâncias do processador de feed de alterações à medida que elas leem o feed de alterações, ou pode usar notificações de ciclo de vida para detetar falhas subjacentes.

Notificações do ciclo de vida

Você pode conectar o processador de alimentação de alterações a qualquer evento relevante em seu ciclo de vida. Você pode optar por ser notificado para um ou todos eles. A recomendação é pelo menos registrar a notificação de erro:

  • Registre um manipulador para WithLeaseAcquireNotification ser notificado quando o host atual adquirir uma concessão para começar a processá-la.
  • Registre um manipulador para WithLeaseReleaseNotification ser notificado quando o host atual liberar uma concessão e parar de processá-la.
  • Registre um manipulador para WithErrorNotification ser notificado quando o host atual encontrar uma exceção durante o processamento. Você precisa ser capaz de distinguir se a origem é o delegado do usuário (uma exceção não tratada) ou um erro que o processador encontra quando tenta acessar o contêiner monitorado (por exemplo, problemas de rede).

As notificações do ciclo de vida estão disponíveis em ambos os modos de alimentação de alterações. Aqui está um exemplo de notificações de ciclo de vida no modo de versão mais recente:

Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Unidade de implantação

Uma única unidade de implantação do processador de feed de alterações consiste em uma ou mais instâncias de computação que têm o mesmo valor e processorName a mesma configuração de contêiner de concessão, mas nomes de instância diferentes. Você pode ter muitas unidades de implantação nas quais cada unidade tem um fluxo de negócios diferente para as alterações e cada unidade de implantação consiste em uma ou mais instâncias.

Por exemplo, você pode ter uma unidade de implantação que aciona uma API externa sempre que há uma alteração no contêiner. Outra unidade de implantação pode mover dados em tempo real sempre que houver uma alteração. Quando ocorre uma alteração no contêiner monitorado, todas as unidades de implantação são notificadas.

Dimensionamento dinâmico

Como mencionado anteriormente, dentro de uma unidade de implantação, você pode ter uma ou mais instâncias de computação. Para aproveitar a distribuição de computação dentro da unidade de implantação, os únicos requisitos principais são que:

  • Todas as instâncias devem ter a mesma configuração de contentor de concessão.
  • Todas as instâncias devem ter o mesmo valor para processorName.
  • Cada instância precisa de ter um nome de instância diferente (WithInstanceName).

Se essas três condições se aplicarem, o processador de alimentação de alterações distribuirá todas as concessões que estão no contêiner de concessão em todas as instâncias em execução dessa unidade de implantação e paralelizará a computação usando um algoritmo de distribuição igual. Uma concessão pertence a uma instância a qualquer momento, portanto, o número de instâncias não deve ser maior do que o número de locações.

O número de instâncias pode crescer e diminuir. O processador de alimentação de alterações ajusta dinamicamente a carga, redistribuindo-a de acordo.

Além disso, o processador de alimentação de alterações pode ajustar dinamicamente a escala de um contêiner se a taxa de transferência ou o armazenamento do contêiner aumentarem. Quando o contêiner cresce, o processador de alimentação de alterações lida com o cenário de forma transparente, aumentando dinamicamente as concessões e distribuindo as novas concessões entre as instâncias existentes.

Hora de início

Por padrão, quando um processador de alimentação de alterações é iniciado pela primeira vez, ele inicializa o contêiner de concessão e inicia seu ciclo de vida de processamento. Quaisquer alterações que tenham acontecido no contêiner monitorado antes que o processador de alimentação de alterações seja inicializado pela primeira vez não são detetadas.

Leitura de uma data e hora anteriores

É possível inicializar o processador de feed de alterações para ler as alterações a partir de uma data e hora específicas passando uma instância de para a WithStartTime extensão do DateTime construtor:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

O processador de alimentação de alterações é inicializado para essa data e hora específicas e começa a ler as alterações que aconteceram depois.

Leitura desde o início

Em outros cenários, como em migrações de dados ou se você estiver analisando todo o histórico de um contêiner, precisará ler o feed de alterações desde o início da vida útil desse contêiner. Você pode usar WithStartTime na extensão builder, mas pass DateTime.MinValue.ToUniversalTime(), que gera a representação UTC do valor mínimo DateTime como neste exemplo:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

O processador de alimentação de alterações é inicializado e começa a ler as alterações desde o início da vida útil do contêiner.

Nota

Essas opções de personalização funcionam apenas para configurar o ponto inicial no tempo do processador de feed de alterações. Depois que o contêiner de concessão é inicializado pela primeira vez, a alteração dessas opções não tem efeito.

A personalização do ponto de partida só está disponível para o modo de alimentação de alteração de versão mais recente. Ao usar todas as versões e o modo de exclusão, você deve começar a ler a partir do momento em que o processador é iniciado ou retomar a partir de um estado de concessão anterior que esteja dentro do período de retenção de backup contínuo da sua conta.

Alterar feed e taxa de transferência provisionada

Altere as operações de leitura de feed nas unidades de solicitação de consumo de contêiner monitorado. Certifique-se de que o contêiner monitorado não está com limitação. A limitação adiciona atrasos no recebimento de eventos de feed de alterações em seus processadores.

As operações no contêiner de locação (atualização e manutenção do estado) consomem unidades de solicitação. Quanto maior o número de instâncias que usam o mesmo contêiner de concessão, maior o consumo potencial de unidades de solicitação. Certifique-se de que seu contêiner de locação não está passando por limitação. A limitação adiciona atrasos no recebimento de eventos de feed de alterações. A limitação pode até mesmo encerrar completamente o processamento.

Partilhar o contentor de arrendamento

Você pode compartilhar um contêiner de concessão entre várias unidades de implantação. Em um contêiner de concessão compartilhada, cada unidade de implantação escuta um contêiner monitorado diferente ou tem um valor diferente para processorName. Nessa configuração, cada unidade de implantação mantém um estado independente no contêiner de concessão. Analise o consumo da unidade de solicitação em um contêiner de concessão para certificar-se de que a taxa de transferência provisionada é suficiente para todas as unidades de implantação.

Configuração avançada de arrendamento

Três configurações principais podem afetar o funcionamento do processador de alimentação de alterações. Cada configuração afeta o consumo da unidade de solicitação no contêiner de locação. Você pode definir uma destas configurações ao criar o processador de feed de alterações, mas use-as com cuidado:

  • Aquisição de locação: por padrão, a cada 17 segundos. Um host verifica periodicamente o estado do repositório de locação e considera a aquisição de locações como parte do processo de dimensionamento dinâmico. Esse processo é feito executando uma consulta no contêiner de concessão. A redução desse valor torna o reequilíbrio e a aquisição de locações mais rápidos, mas aumenta o consumo de unidades solicitadas no contêiner de locação.
  • Expiração da locação: Por padrão, 60 segundos. Define a quantidade máxima de tempo que uma concessão pode existir sem qualquer atividade de renovação antes de ser adquirida por outro host. Quando um host falha, as concessões que ele possuía são retiradas por outros hosts após esse período de tempo, mais o intervalo de renovação configurado. Reduzir esse valor torna a recuperação após uma falha de host mais rápida, mas o valor de expiração nunca deve ser menor do que o intervalo de renovação.
  • Renovação de Locação: Por padrão, a cada 13 segundos. Um anfitrião que possui um contrato de arrendamento renova periodicamente o contrato, mesmo que não haja novas alterações para consumir. Este processo é feito através da execução de um Replace na concessão. A redução desse valor reduz o tempo necessário para detetar concessões perdidas por uma falha de host, mas aumenta o consumo de unidade de solicitação no contêiner de locação.

Onde hospedar o processador de alimentação de alterações

O processador de feed de alterações pode ser hospedado em qualquer plataforma que suporte processos ou tarefas de longa execução. Seguem-se alguns exemplos:

Embora o processador de alimentação de alterações possa ser executado em ambientes de curta duração porque o contêiner de concessão mantém o estado, o ciclo de inicialização desses ambientes adiciona atrasos ao tempo necessário para receber notificações (devido à sobrecarga de iniciar o processador toda vez que o ambiente é iniciado).

Requisitos de acesso baseados em funções

Ao usar o Microsoft Entra ID como mecanismo de autenticação, verifique se a identidade tem as permissões adequadas:

  • No contentor monitorizado:
    • Microsoft.DocumentDB/databaseAccounts/readMetadata
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
  • No contentor de locação:
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery

Recursos adicionais

Próximos passos

Saiba mais sobre o processador de alimentação de alterações nos seguintes artigos: