探索 Azure Cosmos DB 中的變更摘要

已完成

Azure Cosmos DB 中的變更摘要是容器變更發生順序的持續性記錄。 Azure Cosmos DB 中的變更摘支援的運作方式是接聽 Azure Cosmos DB 容器是否有變更。 接著依修改的順序來輸出已變更文件的排序清單。 持續性變更可透過非同步或遞增的方式處理,而輸出可以散發給一或多個取用者進行平行處理。

變更摘要和不同的作業

您現在可在變更摘要中查看所有插入和更新。 您無法篩選特定作業類型的變更摘要。 摘要變更目前不會記錄刪除作業。 作為因應措施,您可以於正在刪除的項目上新增虛標記。 例如,您可以在項目中新增名為「已刪除」的屬性,將其值設定為「true」,然後設定項目的存留時間 (TTL) 值。 設定 TTL 可確保自動刪除項目。

讀取 Azure Cosmos DB 變更摘要

您可以透過推送模型或提取模型來使用 Azure Cosmos DB 變更摘要。 透過推送模型,變更摘要處理器會將工作推送至具有處理此工作商務邏輯的用戶端。 然而,變更摘要處理器內將處理檢查工作和儲存上次處理工作狀態的複雜項目。

透過提取模型,用戶端必須從伺服器提取工作。 在此情況下,用戶端具有處理工作的商務邏輯,也會儲存上次處理工作的狀態。 用戶端會平行處理多個用戶端的負載平衡工作,以及處理錯誤。

注意

建議您使用推送模型,因為您無須擔心輪詢變更摘要以取得未來變更、儲存上次變更狀態及其他優點。

大部分使用 Azure Cosmos DB 變更摘要的案例都將使用其中一個推送模型選項。 不過,在某些情況下,您可能會想要對提取模型進行額外的低層級控制。 額外的低層級控制包括:

  • 透過特定分割區索引鍵讀取變更
  • 控制用戶端接收要處理變更的步調
  • 執行變更摘要中的現有資料一次性讀取 (例如,執行資料移轉)

透過推送模型讀取變更摘要

有兩種方式可讓您透過推送模型以從變更摘要中進行讀取:Azure Functions Azure Cosmos DB 觸發程序與變更摘要處理器程式庫。 Azure Functions 會在幕後使用變更摘要處理器,因此,這兩者讀取變更摘要的方式類似。 將 Azure Functions 視為變更摘要處理器的主控平台,而不是讀取變更摘要的完全不同方式。 Azure Functions 會在幕後使用變更摘要處理器。 它會自動跨容器分區平行處理變更處理。

Azure Functions

您可以建立小型反應式 Azure Functions,其會在 Azure Cosmos DB 容器的變更摘要中每個新事件上自動觸發。 您可以透過適用於 Azure Cosmos DB 的 Azure Functions 觸發程序以使用變更摘要處理器的調整和可靠事件偵測功能,而無須維護任何背景工作角色基礎結構。

圖表顯示觸發 Azure Functions 以進行處理的變更摘要。

變更摘要處理器

變更摘要處理器是 Azure Cosmos DB .NET V3Java V4 SDK 的一部分。 這可以簡化讀取變更摘要的程序,並有效將事件處理散發給多個取用者。

實作變更摘要處理器有四個主要元件:

  1. 受監視的容器:受監視的容器含有會產生變更摘要的資料。 對於受監視的容器所進行的任何插入和更新,都會反映在容器的變更摘要中。

  2. 租用容器:租用容器會作為狀態儲存體,並協調如何處理多個背景工作角色的變更摘要。 租用容器可以儲存在與受監視容器相同的帳戶中,或儲存在個別帳戶中。

  3. 計算執行個體:計算執行個體主控變更摘要處理器以接聽變更。 視平台而定,呈現方法可能為 VM、Kubernetes Pod、Azure App Service 執行個體、實際實體機器。 其具有本文中參考為執行個體名稱的唯一識別碼。

  4. 委派:委派是一種程式碼,定義身為開發人員的您想要對變更摘要處理器讀取的每個批次變更進行哪些動作。

實作變更摘要處理器時,進入點一律是受監視的容器,從您呼叫 GetChangeFeedProcessorBuilderContainer 執行個體:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

其中第一個參數是描述此處理器目標的不同名稱,而第二個名稱則是處理變更的委派實作。 委派的範例如下所示:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

之後,您會使用 WithInstanceName 定義計算執行個體名稱或唯一識別碼,這在您要部署的每個計算執行個體中應該是唯一且不同的,最後,這是使用 WithLeaseContainer 維護租用狀態的容器。

呼叫 Build 提供可藉由呼叫 StartAsync 來啟動的處理器執行個體。

主機執行個體的正常生命週期為:

  1. 讀取變更摘要。
  2. 如果沒有變更,則睡眠一段預先定義的時間 (可使用 Builder 中的 WithPollInterval 自訂),然後移至 #1。
  3. 如果有變更,則將其傳送至委派。
  4. 當委派順利完成處理變更時,請以最新的處理時間點更新租用存放區,並移至 #1。