Azure Cosmos DB の変更フィードを探索する
Azure Cosmos DB の変更フィードは、コンテナーに対して行われた変更が発生した順番で記録されている永続的な記録です。 Azure Cosmos DB の変更フィードのサポートは、Azure Cosmos DB コンテナーの変更をリッスンすることで機能します。 変更されたドキュメントは、変更された順に並べ替えられた一覧に出力されます。 永続的な変更は非同期的に増分処理できます。また、出力を 1 つ以上のコンシューマーに分散させて並列処理することもできます。
変更フィードとさまざまな操作
現在、変更フィードにはすべての挿入と更新が表示されています。 特定の種類の操作について、変更フィードをフィルター処理することはできません。 現在、変更フィードでは削除操作は記録されません。 回避策として、削除される項目にソフト マーカーを追加できます。 たとえば、項目に "deleted" という名前の属性を追加し、その値を "true" に設定してから、項目に Time to Live (TTL) 値を設定できます。 TTL を設定すると、項目が自動的に削除されるようになります。
Azure Cosmos DB の変更フィードの読み取り
Azure Cosmos DB の変更フィードは、プッシュ モデルかプル モデルのいずれかを使用して操作できます。 プッシュ モデルでは、変更フィード プロセッサによって、作業が、その作業を処理するためのビジネス ロジックを備えたクライアントにプッシュされます。 ただし、作業を確認して最後に処理された作業の状態を保存する場合の複雑さは、変更フィード プロセッサ内で処理されます。
プル モデルでは、クライアントによって作業がサーバーからプルされる必要があります。 この場合、クライアントは作業を処理するためのビジネス ロジックを備え、最後に処理された作業の状態も格納します。 クライアントは、作業を並列処理する複数のクライアント間での負荷分散を処理し、エラーを処理します。
Note
今後の変更に対する変更フィードのポーリング、最後に処理された変更の状態の保存、その他の利点について心配する必要がないため、プッシュ モデルを使用することをお勧めします。
Azure Cosmos DB の変更フィードを使用するほとんどのシナリオでは、プッシュ モデルのオプションのうち 1 つが使用されます。 ただし、シナリオによっては、プル モデルの低レベルの制御がさらに必要になる場合があります。 追加の低レベルの制御には、次のものが含まれます。
- 特定のパーティション キーから変更を読み取る
- クライアントが処理のために変更を受け取るペースを制御する
- 変更フィードの既存のデータを 1 回だけ読み取る (データ移行を行う場合など)
プッシュ モデルを使用した変更フィードの読み取り
変更フィードからの読み取りをプッシュ モデルで行うには 2 つの方法があり、Azure Functions Azure Cosmos DB トリガー、または変更フィード プロセッサ ライブラリを使います。 Azure Functions では、変更フィード プロセッサがバックグラウンドで使用されるため、これらの変更フィードの読み取り方法はどちらも似ています。 Azure Functions は、まったく異なる読み取り方法ではなく、単に変更フィード プロセッサのホスティング プラットフォームであると考えてください。 Azure Functions のバックグラウンドでは変更フィード プロセッサが使われます。 それによって、変更処理がコンテナーのパーティション間で自動的に並列化されます。
Azure Functions
Azure Cosmos DB コンテナーの変更フィードでの新しいイベントのたびに自動的にトリガーされる、小さな事後対応型 Azure Functions を作成できます。 Azure Cosmos DB 用 Azure Functions トリガーを使用すると、worker インフラストラクチャを保持する必要なく、変更フィード プロセッサのスケーリングと信頼性の高いイベント検出機能を使用することができます。
変更フィード プロセッサ
変更フィード プロセッサは Azure Cosmos DB .NET V3 および Java V4 SDK の一部です。 それにより、変更フィードを読み取り、イベント処理を複数のコンシューマーに効率的に分散させるプロセスが簡単になります。
変更フィード プロセッサの実装には、4 つの主要なコンポーネントがあります。
監視対象コンテナー: 監視対象コンテナーには、変更フィードの生成元となるデータが含まれています。 監視対象コンテナーに対する挿入と更新が、コンテナーの変更フィードに反映されます。
リース コンテナー: リース コンテナーは、状態ストレージとして機能し、複数の worker 間での変更フィードの処理を調整します。 リース コンテナーは、監視対象コンテナーと同じアカウントまたは別のアカウントに格納できます。
コンピューティング インスタンス: コンピューティング インスタンスでは、変更をリッスンする変更フィード プロセッサをホストします。 プラットフォームによっては、それは VM、Kubernetes ポッド、Azure App Service インスタンス、実際の物理コンピューターによって表される場合があります。 これには、この記事全体を通して "インスタンス名" と呼ばれている、一意の識別子があります。
デリゲート: デリゲートは、変更フィード プロセッサによって読み取られる変更の各バッチについて、開発者が行いたいことが定義されているコードです。
変更フィード プロセッサを実装する場合、エントリのポイントは常に監視対象のコンテナーであり、Container
インスタンスから GetChangeFeedProcessorBuilder
を呼び出します。
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
1 番目のパラメーターは、このプロセッサの目的を説明する一意の名前です。2 番目の名前は、変更を処理するデリゲートの実装です。 デリゲートの例を次に示します。
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
その後、WithInstanceName
を使用してコンピューティング インスタンスの名前または一意の識別子を定義します。これは、デプロイしようとしているコンピューティング インスタンスごとに一意で異なっている必要があります。最後に、WithLeaseContainer
を使用して、リース状態を保持するコンテナーを定義します。
Build
を呼び出すとプロセッサ インスタンスが提供され、StartAsync
を呼び出すとこれを開始できます。
ホスト インスタンスの通常のライフ サイクルは次のとおりです。
- 変更フィードを読み取ります。
- 変更がない場合は、事前に定義された時間 (
Builder
のWithPollInterval
でカスタマイズ可能) だけスリープし、#1 に移ります。 - 変更がある場合は、それらをデリゲートに送信します。
- デリゲートによる変更の処理が正常に完了したら、最後に処理された時点でリース ストアを更新し、#1 に移ります。