Esplorare il feed di modifiche in Azure Cosmos DB

Completato

Il feed di modifiche in Azure Cosmos DB è un record permanente delle modifiche apportate a un contenitore nell'ordine in cui si verificano. Il supporto del feed di modifiche in Azure Cosmos DB è in ascolto di eventuali modifiche in un contenitore di Azure Cosmos DB. Restituisce quindi l'elenco di documenti cambiati nell'ordine in cui sono stati modificati. Le modifiche permanenti possono essere elaborate in modo asincrono e incrementale e l'output può essere distribuito a uno o più consumer per l'elaborazione parallela.

Feed di modifiche e operazioni diverse

Attualmente, nel feed di modifiche vengono visualizzati tutti gli inserimenti e gli aggiornamenti e non è possibile filtrare il feed di modifiche per un tipo specifico di operazione. Attualmente, le operazioni di eliminazione non vengono registrate nel feed di modifiche. Come soluzione alternativa, è possibile aggiungere un soft marker sugli elementi da eliminare. Ad esempio, è possibile aggiungere un attributo denominato "eliminato" nell'elemento, impostarne il valore su "true", quindi impostare un valore TTL (Time-to-Live) sull'elemento. L'impostazione della durata (TTL) garantisce che l'elemento venga eliminato automaticamente.

Lettura del feed di modifiche in Azure Cosmos DB

È possibile usare il feed di modifiche di Azure Cosmos DB con un modello push o un modello pull. Con un modello push, il processore del feed di modifiche esegue il push del lavoro a un client che ha la logica di business per l'elaborazione di questo lavoro. Tuttavia, la complessità correlata alla verifica dello stato di archiviazione per l'ultimo lavoro elaborato viene gestita all'interno del processore del feed di modifiche.

Con un modello di pull, il client deve eseguire il pull del lavoro dal server. In questo caso, il client ha una logica di business per l'elaborazione del lavoro e memorizza inoltre lo stato dell'ultimo lavoro elaborato. Il client gestisce il bilanciamento del carico in più client che elaborano il lavoro in parallelo e gestiscono gli errori.

Nota

È consigliabile usare il modello push perché non è necessario preoccuparsi di eseguire il polling del feed di modifiche per le modifiche future, archiviando lo stato per l'ultima modifica elaborata e altri vantaggi.

Nella maggior parte degli scenari in cui si usa il feed di modifiche di Azure Cosmos DB verrà adottata una delle opzioni del modello push. Tuttavia, per alcuni scenari potrebbe essere necessario un controllo extra di basso livello del modello pull. Il controllo extra di basso livello include:

  • Lettura delle modifiche di una chiave di partizione specifica
  • Controllo della velocità di ricezione delle modifiche per l'elaborazione da parte del client
  • Esecuzione di una sola lettura dei dati esistenti nel feed di modifiche (ad esempio per eseguire una migrazione dei dati)

Lettura del feed di modifiche con un modello push

Esistono due modi per leggere dal feed di modifiche con un modello push: Trigger di Funzioni di Azure per Azure Cosmos DB e la libreria del processore del feed di modifiche. Funzioni di Azure usa il processore del feed di modifiche dietro le quinte, quindi sono due modi molto simili di leggere il feed di modifiche. È possibile considerare Funzioni di Azure come una semplice piattaforma di hosting per il processore dei feed di modifiche, non un modo completamente diverso di leggere il feed di modifiche. Funzioni di Azure usa il processore feed di modifiche. Parallelizza automaticamente l'elaborazione delle modifiche nelle partizioni del contenitore.

Funzioni di Azure

È possibile creare piccole istanze reattive di Funzioni di Azure che verranno attivate automaticamente a ogni nuovo evento nel feed di modifiche del contenitore di Azure Cosmos DB. Con il trigger di Funzioni di Azure per Azure Cosmos DB, è possibile usare la funzionalità scalabile e affidabile di rilevamento eventi del processore del feed di modifiche senza la necessità di gestire un'infrastruttura di ruoli di lavoro.

Diagramma che mostra il feed di modifiche che attiva Funzioni di Azure per l'elaborazione.

Processore dei feed di modifiche

Il processore del feed di modifiche fa parte degli SDK DB .NET V3 e Java V4 di Azure Cosmos DB. Semplifica il processo di lettura del feed di modifiche e distribuisce in modo efficace l'elaborazione degli eventi tra più consumer.

Ci sono quattro componenti principali per l'implementazione del processore del feed di modifiche:

  1. Contenitore monitorato: il contenitore monitorato include i dati da cui viene generato il feed di modifiche. Eventuali inserimenti e aggiornamenti nel contenitore monitorato si riflettono nel feed di modifiche del contenitore.

  2. Contenitore di lease: il contenitore di lease agisce come una risorsa di archiviazione di stato e coordina l'elaborazione del feed di modifiche tra più ruoli di lavoro. Il contenitore di lease può essere archiviato nello stesso account del contenitore monitorato o in un account diverso.

  3. Istanza di calcolo: un'istanza di calcolo ospita il processore del feed di modifiche per l'ascolto delle modifiche. A seconda della piattaforma, può essere rappresentata da una macchina virtuale, da un pod di Kubernetes, da un'istanza di Servizio app di Azure o da un computer fisico effettivo. Presenta un identificatore univoco, a cui in questo articolo si fa riferimento con il nome dell'istanza.

  4. Delegato: il delegato è il codice che definisce le operazioni che lo sviluppatore vuole eseguire con ogni batch di modifiche letto dal processore del feed di modifiche.

Quando si implementa il processore del feed di modifiche, il punto di ingresso è sempre il contenitore monitorato, da un'istanza di Container si chiama GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Il primo parametro è un nome distinto che descrive l'obiettivo di questo elaboratore, mentre il secondo nome è l'implementazione del delegato che gestisce la modifica. Di seguito è riportato un esempio di delegato:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Successivamente, si definisce il nome o l'identificatore univoco dell'istanza di calcolo con WithInstanceName, che deve essere univoco e diverso in ogni istanza di calcolo da distribuire e, infine, che rappresenta il contenitore per mantenere lo stato del lease con WithLeaseContainer.

Chiamando Build, si otterrà l'istanza del processore che è possibile avviare chiamando StartAsync.

Il normale ciclo di vita di un'istanza dell'host è:

  1. Leggere il feed di modifiche.
  2. Se non sono state apportate modifiche, sospendere per un periodo di tempo predefinito personalizzabile con WithPollInterval nel Builder e passare a #1.
  3. Se ci sono modifiche, inviarle al delegato.
  4. Quando il delegato termina correttamente l'elaborazione delle modifiche, aggiornare l'archivio dei lease con l'ultimo momento elaborato e passare a 1.