Erkunden des Änderungsfeeds in Azure Cosmos DB

Abgeschlossen

Der Änderungsfeed in Azure Cosmos DB ist eine persistente Aufzeichnung der Änderungen an einem Container in der Reihenfolge ihres Auftretens. Zur Unterstützung des Änderungsfeeds in Azure Cosmos DB wird gelauscht, ob in einem Azure Cosmos DB-Container Änderungen auftreten. Anschließend wird die sortierte Liste von geänderten Dokumenten in der Reihenfolge ausgegeben, in der sie geändert wurden. Die persistenten Änderungen können asynchron und inkrementell verarbeitet werden, und die Ausgabe kann über einen oder mehrere Consumer für die Parallelverarbeitung verteilt werden.

Änderungsfeed und verschiedene Vorgänge

Heute werden alle Einfügungen und Updates im Änderungsfeed angezeigt. Sie können den Änderungsfeed nicht nach einen bestimmten Vorgangstyp filtern. Aktuell werden im Änderungsfeed keine Löschvorgänge protokolliert. Als Problemumgehung können Sie den zu löschenden Elementen eine schwache Markierung hinzufügen. Sie können dem Element beispielsweise ein Attribut mit dem Namen „deleted“ hinzufügen, seinen Wert auf „true“ festlegen und dann eine Gültigkeitsdauer (Time-To-Live, TTL) für das Element festlegen. Durch Festlegen der Gültigkeitsdauer wird sichergestellt, dass das Element automatisch gelöscht wird.

Lesen des Azure Cosmos DB-Änderungsfeeds

Für die Arbeit mit dem Azure Cosmos DB-Änderungsfeed können Sie ein Pushmodell oder ein Pullmodell verwenden. Bei einem Pushmodell pusht der Änderungsfeedprozessor die Aufgaben an einen Client, der über Geschäftslogik zur Verarbeitung dieser Aufgaben verfügt. Die Komplexität der Überprüfung auf Aufgaben und das Speichern des Zustands der zuletzt verarbeiteten Aufgaben wird jedoch im Änderungsfeedprozessor behandelt.

Bei einem Pullmodell muss der Client die Aufgaben vom Server abrufen. In diesem Fall verfügt der Client über Geschäftslogik für die Verarbeitung von Arbeit und speichert auch den Zustand für die zuletzt verarbeitete Arbeit. Der Client verarbeitet den Lastenausgleich für mehrere Clients, die parallel verarbeitet werden und Fehler behandeln.

Hinweis

Es wird empfohlen, das Pushmodell zu verwenden, da Sie sich nicht darum kümmern müssen, zukünftige Änderungen vom Änderungsfeed abzurufen oder den Status der letzten verarbeiteten Änderung zu speichern, und von weiteren Vorteilen profitieren.

In den meisten Szenarien, in denen der Azure Cosmos DB-Änderungsfeed verwendet wird, wird eine der Pushmodelloptionen verwendet. Es gibt jedoch einige Szenarios, in denen Sie die zusätzliche, besonders präzise Kontrolle des Pullmodells benötigen. Die besonders präzise Kontrolle umfasst:

  • Lesen von Änderungen von einem bestimmten Partitionsschlüssel
  • Steuern der Geschwindigkeit, mit der der Client Änderungen für die Verarbeitung empfängt
  • Ausführen eines einmaligen Lesevorgangs über die vorhandenen Daten im Änderungsfeed (z. B. für eine Datenmigration)

Lesen des Änderungsfeeds mit einem Pushmodell

Es gibt zwei Möglichkeiten, Daten mithilfe eines Pushmodells aus dem Änderungsfeed zu lesen: Azure Functions-Trigger für Azure Cosmos DB und die Change Feed Processor-Bibliothek. Azure Functions verwendet im Hintergrund den Änderungsfeedprozessor, sodass beide Methoden zum Lesen des Änderungsfeeds sehr ähnlich sind. Stellen Sie sich Azure Functions einfach als Hostingplattform für den Änderungsfeedprozessor vor und nicht als völlig andere Form, den Änderungsfeed zu lesen. Azure Functions verwendet unter der Haube den Änderungsfeedprozessor. Die Änderungsverarbeitung wird automatisch in den Partitionen Ihres Containers parallelisiert.

Azure-Funktionen

Sie können kleine reaktive Azure-Funktionen erstellen, die automatisch bei jedem neuen Ereignis im Änderungsfeed Ihres Azure Cosmos DB-Containers ausgelöst werden. Mit dem Azure Functions-Trigger für Azure Cosmos DB können Sie die Skalierung des Änderungsfeedprozessors sowie zuverlässige Funktionen für die Ereigniserkennung nutzen, ohne eine Workerinfrastruktur verwalten zu müssen.

Das Diagramm veranschaulicht, wie die Verarbeitung durch Azure Functions durch den Änderungsfeed ausgelöst wird.

Change Feed Processor

Der Änderungsfeedprozessor ist Teil der SDKs .NET V3 und Java V4 für Azure Cosmos DB. Er vereinfacht das Lesen des Änderungsfeeds sowie das effektive Verteilen der Ereignisverarbeitung auf mehrere Consumer.

Die Implementierung des Änderungsfeedprozessors umfasst vier Hauptkomponenten:

  1. Überwachter Container: Der überwachte Container enthält die Daten, aus denen der Änderungsfeed generiert wird. Alle Einfügungen und Aktualisierungen für den überwachten Container werden im Änderungsfeed des Containers berücksichtigt.

  2. Leasecontainer: Der Leasecontainer fungiert als Zustandsspeicher und koordiniert die Verarbeitung des Änderungsfeeds über mehrere Worker hinweg. Er kann im gleichen Konto wie der überwachte Container oder in einem separaten Konto gespeichert werden.

  3. Die Compute-Instanz: Eine Compute-Instanz hostet den Änderungsfeedprozessor, um Änderungen zu überwachen. Je nach Plattform kann dies durch einen virtuellen Computer, einen Kubernetes-Pod, eine Azure App Service-Instanz oder einen physischen Computer erzielt werden. Sie verfügt über einen eindeutigen Bezeichner, auf den in diesem Artikel als Instanzname verwiesen wird.

  4. Delegat: Der Delegat ist der Code, der definiert, wie Sie als Entwickler mit dem jeweiligen Batch von Änderungen verfahren möchten, der vom Änderungsfeedprozessor gelesen wurde.

Beim Implementieren des Änderungsfeedprozessors ist der Einstiegspunkt immer der überwachte Container einer Container-Instanz, von der Sie GetChangeFeedProcessorBuilder aufrufen:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Der erste Parameter ist hierbei ein eindeutiger Name, der das Ziel dieses Prozessors beschreibt. Der zweite Name ist die Delegatimplementierung zur Behandlung von Änderungen. Im Folgenden ein Beispiel für einen Delegat:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Anschließend definieren Sie mit WithInstanceName den Namen der Computeinstanz bzw. den eindeutigen Bezeichner. Dieser sollte in jeder bereitgestellten Computeinstanz unterschiedlich und eindeutig sein. Schließlich definieren Sie mit WithLeaseContainer den Container zum Verwalten des Leasestatus.

Durch Aufrufen von Build erhalten Sie die Prozessorinstanz, die dann durch Aufrufen von StartAsync gestartet werden kann.

Der normale Lebenszyklus einer Hostinstanz sieht wie folgt aus:

  1. Der Änderungsfeed wird gelesen.
  2. Sollten keine Änderungen vorhanden sein, wird nach einer vordefinierten Pause (die mithilfe von WithPollInterval in Builder angepasst werden kann) erneut Schritt 1 ausgeführt.
  3. Sind Änderungen vorhanden, werden sie an den Delegaten gesendet.
  4. Nach erfolgreicher Verarbeitung der Änderungen durch den Delegaten wird der Leasespeicher mit dem neuesten verarbeiteten Zeitpunkt aktualisiert, und es wird wieder Schritt 1 ausgeführt.