次の方法で共有


Azure Cosmos DB の変更フィード プロセッサ

適用対象: NoSQL

変更フィード プロセッサは Azure Cosmos DB .NET V3 および Java V4 SDK の一部です。 それにより、変更フィードを読み取り、イベント処理を複数のコンシューマーに効率的に分散させるプロセスが簡単になります。

変更フィード プロセッサを使用することの主な利点は、変更フィード内のすべてのイベントが確実に "少なくとも 1 回" は配信されるそのフォールト トレラントな設計です。

サポートされる SDK

.Net V3 Java Node.JS Python

変更フィード プロセッサのコンポーネント

変更フィード プロセッサには、次の 4 つの主要コンポーネントがあります。

  • 監視対象コンテナー: 監視対象コンテナーには、変更フィードの生成元となるデータが含まれています。 監視対象コンテナーに対する挿入と更新が、コンテナーの変更フィードに反映されます。

  • リース コンテナー: リース コンテナーは状態ストレージとして機能し、複数の worker 間での変更フィードの処理を調整します。 リース コンテナーは、監視対象コンテナーと同じアカウントまたは別のアカウントに格納できます。

  • コンピューティング インスタンス: コンピューティング インスタンスでは、変更をリッスンする変更フィード プロセッサをホストします。 プラットフォームによっては、これは仮想マシン (VM)、Kubernetes ポッド、Azure App Service インスタンス、または実際の物理マシンによって表される場合があります。 コンピューティング インスタンスには、この記事全体で "インスタンス名" と呼ぶ一意の識別子があります。

  • デリゲート: デリゲートは、変更フィード プロセッサによって読み取られる変更の各バッチについて、開発者が何を実行するかが定義されているコードです。

変更フィード プロセッサのこれら 4 要素の連携のしくみについて理解を深めるために、次の図の例を見てみましょう。 監視対象コンテナーでは、項目が保存され、パーティション キーとして "City" が使われます。 パーティション キーの値は、項目を含む範囲 (各範囲は物理パーティションを表す) 内に分散されています。

図には、2 つのコンピューティング インスタンスが示されており、変更フィード プロセッサは、コンピューティング分散を最大化するために各インスタンスに異なる範囲を割り当てます。 各インスタンスには、異なる固有の名前があります。

各範囲は並列で読み取られます。 範囲の進行状況は、"リース" ドキュメントを使用して、リース コンテナー内で他の範囲とは別に保持されます。 リースの組み合わせは、変更フィード プロセッサの現在の状態を表します。

変更フィード プロセッサの例

変更フィード プロセッサを実装する

.NET の変更フィード プロセッサは、最新バージョン モードと、すべてのバージョンと削除モードで使用できます。 すべてのバージョンと削除モードはプレビュー段階であり、バージョン 3.40.0-preview.0 以降の変更フィード プロセッサでサポートされています。 どちらのモードでも、エントリ ポイントは常に監視対象のコンテナーです。

最新バージョン モードを使用して読み取る場合は、Container インスタンスで GetChangeFeedProcessorBuilder を呼び出します。

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

すべてのバージョンと削除モードを使用して読み取る場合は、Container インスタンスから GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes を呼び出します。

Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

どちらのモードでも、最初のパラメーターは、このプロセッサの目標を記述する個別の名前です。 2 番目の名前は、変更を処理するデリゲートの実装です。

最新バージョン モードのデリゲートの例を次に示します。

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

すべてのバージョンと削除モードのデリゲートの例を次に示します。

static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ChangeFeedItem<ToDoItem> item in changes)
    {
        if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item.");
        }
        else
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
        }
        // Simulate work
        await Task.Delay(1);
    }
}

その後、WithInstanceName を使用してコンピューティング インスタンス名または一意識別子を定義します。 コンピューティング インスタンス名は一意であり、デプロイするコンピューティング インスタンスごとに異なる必要があります。 WithLeaseContainer を使用して、リース状態を維持するようにコンテナーを設定します。

Build を呼び出すとプロセッサ インスタンスが提供され、StartAsync を呼び出すことによってそれを開始できます。

Note

前のコード スニペットは、GitHub のサンプルから取得されています。 最新バージョン モードまたはすべてのバージョンと削除モードのサンプルを入手できます。

処理のライフ サイクル

ホスト インスタンスの通常のライフ サイクルは次のとおりです。

  1. 変更フィードを読み取ります。
  2. 変更がない場合は、事前に定義した時間 (ビルダーの WithPollInterval を使用してカスタマイズ可能) だけスリープし、1 に戻ります。
  3. 変更がある場合は、それらをデリゲートに送信します。
  4. デリゲートによる変更の処理が正常に完了すると、最後に処理された時点でリース ストアを更新し、#1 に移ります。

エラー処理

変更フィード プロセッサには、ユーザー コード エラーに対する回復性があります。 デリゲートの実装にハンドルされない例外がある場合 (ステップ 4)、その特定の変更バッチを処理しているスレッドは停止し、最終的には新しいスレッドが作成されます。 新しいスレッドは、リース ストアにその範囲のパーティション キー値が保存された最新の時点をチェックします。 新しいスレッドはそこから再開し、実質的に同じ変更のバッチをデリゲートに送信します。 デリゲートによって変更が正しく処理されるまで、この動作が続けられます。これにより、変更フィード プロセッサでは "少なくとも 1 回" が保証されます。

Note

変更のバッチが再試行されないシナリオが 1 つだけ存在します。 最初に発生したデリゲートの実行でエラーが発生した場合、リース ストアには再試行時に使用される以前の保存状態がありません。 このような場合、再試行では最初の開始構成が使用され、最後のバッチが含まれていない場合もあります。

変更フィード プロセッサが同じバッチの変更を継続的に再試行して "行き詰まる" ことがないように、例外が発生した場合に、ドキュメントに書き込むためのデリゲート コードのロジックをエラー メッセージ キューに追加する必要があります。 このように設計することで、今後の変更の処理を継続しながら、未処理の変更を追跡できます。 エラー メッセージ キューは、別の Azure Cosmos DB コンテナーである可能性があります。 具体的なデータ ストアは重要ではありません。 未処理の変更が永続化されることのみが必要です。

変更フィード推定機能を使用して、変更フィードを読み取る変更フィード プロセッサ インスタンスの進行状況を監視することもできます。または、ライフ サイクル通知を使用して基になっている障害を検出できます。

ライフ サイクル通知

変更フィード プロセッサは、そのライフ サイクル内の関連するイベントに接続できます。 1 つまたはすべての通知を受け取るように選択できます。 少なくとも、エラー通知を登録することをお勧めします。

  • WithLeaseAcquireNotification のハンドラーを登録して、現在のホストがリースを取得して処理を開始したときに通知されるようにします。
  • WithLeaseReleaseNotification のハンドラーを登録して、現在のホストがリースをリリースして処理を停止したときに通知されるようにします。
  • WithErrorNotification のハンドラーを登録すると、現在のホストが処理中に例外を検出したときに通知を受け取ります。 ソースがユーザー デリゲート (ハンドルされない例外) であるか、監視対象のコンテナーにアクセスしようとしたときにプロセッサで発生したエラー (ネットワークの問題など) であるかを区別できる必要があります。

ライフサイクル通知は、どちらの変更フィード モードでも使用できます。 最新バージョン モードでのライフサイクル通知の例を次に示します。

Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

展開単位

変更フィード プロセッサの 1 つのデプロイ ユニットは、processorName の値が同じでリース コンテナー構成も同じであるが、インスタンス名が異なる 1 つ以上のコンピューティング インスタンスで構成されます。 多数のデプロイ ユニットを使用することができ、各ユニットは変更に対して異なるビジネス フローを備え、各デプロイ ユニットは 1 つまたは複数のインスタンスで構成されます。

たとえば、あるデプロイ ユニットは、コンテナーに変更が発生するたびに外部 API をトリガーします。 別のデプロイ ユニットでは、変更が発生するたびに、リアルタイムでデータを移動することができます。 監視対象コンテナーで変更が発生すると、すべてのデプロイ ユニットが通知を受け取ります。

動的スケーリング

前述したように、1 つのデプロイ ユニット内には、1 つまたは複数のコンピューティング インスタンスを使用できます。 デプロイ ユニット内でコンピューティングの分散を利用するには、次の要件のみが重要になります。

  • すべてのインスタンスのリース コンテナーの構成が同じである必要があります。
  • すべてのインスタンスの processorName の値が同じである必要があります。
  • 各インスタンスには、異なるインスタンス名が設定されている必要があります (WithInstanceName)。

これら 3 つの条件に当てはまる場合、変更フィード プロセッサでは、リース コンテナー内のすべてのリースが、そのデプロイ ユニットの実行中の全インスタンスに分散され、均等分散アルゴリズムを使ってコンピューティングが並列化されます。 1 つのリースは特定の時点で 1 つのインスタンスによって所有されるため、インスタンスの数をリースの数より大きくすることはできません。

インスタンス数は増減する可能性があります。 変更フィード プロセッサは、負荷をそれに応じて再分散することで動的に調整します。

さらに、コンテナーのスループットまたはストレージの増加に応じて、変更フィード プロセッサでコンテナーのスケールを動的に調整できます。 コンテナーが拡張されると、変更フィード プロセッサでは、リースを動的に増やし、既存のインスタンス間で新しいリースを分散することによって、このシナリオが透過的に処理されます。

開始時刻

既定では、変更フィード プロセッサは、初めて開始したときに、リース コンテナーを初期化し、その処理のライフ サイクルを開始します。 変更フィード プロセッサが初めて初期化される前に監視対象コンテナー内で発生した変更は検出されません。

以前の日時からの読み取り

DateTime のインスタンスを WithStartTime ビルダー拡張機能に渡すことで、"特定の日時" 以降の変更を読み取るよう変更フィード プロセッサを初期化することができます。

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

変更フィード プロセッサは、その特定の日時に対して初期化され、それ以降に発生した変更の読み取りを開始します。

最初からの読み取り

データの移行やコンテナーの履歴全体の分析など、他のシナリオでは、"そのコンテナーの有効期間の最初" から変更フィードを読み取る必要があります。 ビルダー拡張機能で WithStartTime を使用できますが、DateTime.MinValue.ToUniversalTime() を渡すと、次の例のように DateTime の最小値の UTC 表現が生成されます。

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

変更フィード プロセッサは初期化され、コンテナーの有効期間の最初から変更の読み取りを開始します。

Note

これらのカスタマイズ オプションは、変更フィード プロセッサの開始時点を設定するためだけに機能します。 リース コンテナーが初めて初期化された後、これらのオプションを変更しても効果はありません。

開始ポイントのカスタマイズは、最新バージョンの変更フィード モードでのみ使用できます。 すべてのバージョンと削除モードを使用する場合は、プロセッサの起動時から読み取りを開始するか、アカウントの継続的バックアップの保持期間内にある以前のリース状態から再開する必要があります。

変更フィードとプロビジョニング済みスループット

監視対象コンテナーで変更フィード読み取り操作を行うと、要求ユニットが消費されます。 監視対象コンテナーで調整が発生していないことを確認してください。 調整により、プロセッサーでの変更フィード イベントの受信に遅延が発生します。

リース コンテナーに対する操作 (状態の更新と保守) では、要求ユニットが消費されます。 同じリース コンテナーを使用しているインスタンスの数が多いほど、要求ユニットの潜在的な消費量が多くなります。 リース コンテナーで調整が発生していないことを確認してください。 調整により、変更フィード イベントの受信に遅延が発生します。 調整によって処理が完全に終了する可能性もあります。

リース コンテナーを共有する

リース コンテナーは複数のデプロイ ユニット間で共有できます。 共有リース コンテナーでは、各デプロイ ユニットは別々監視対象コンテナーをリッスンするか、processorName の値が異なります。 この構成では、各デプロイ ユニットは、リース コンテナーに対して独立した状態を維持します。 リース コンテナーでの要求ユニットの消費量を確認し、プロビジョニングされたスループットがすべてのデプロイ ユニットに対して十分であることを確認してください。

高度なリース構成

3 つの主要な構成が、変更フィード プロセッサの動作に影響を与える可能性があります。 各構成は、リース コンテナーの要求ユニットの消費量に影響します。 変更フィード プロセッサを作成するときに、これらの構成のいずれかを設定できますが、慎重に使用してください。

  • リース取得: 既定では 17 秒ごと。 ホストは定期的にリース ストアの状態をチェックし、動的スケーリング プロセスの一環としてリースの取得を検討します。 このプロセスは、リース コンテナーに対してクエリを実行することによって行われます。 この値を小さくすると、リースの再調整と取得が速くなりますが、リース コンテナーでの要求ユニットの消費量が増加します。
  • リースの有効期限: 既定では 60 秒。 リースが、別のホストによって取得されるまで、更新アクティビティなしで存在できる最大時間を定義します。 ホストがクラッシュすると、そのホストが所有するリースは、構成された更新間隔にこの期間を加算した時間の経過後に、他のホストによって取得されます。 この値を小さくすると、ホストのクラッシュ後の復旧が速くなりますが、有効期限の値は更新間隔よりも短くしないでください。
  • リースの更新: 既定では 13 秒ごと。 リースを所有するホストは、処理する新しい変更がない場合でも、定期的にリースを更新します。 このプロセスは、リースで Replace を実行することによって行われます。 この値を小さくすると、ホストのクラッシュによって失われたリースを検出するのに必要な時間が短縮されますが、リース コンテナーでの要求ユニットの消費量が増加します。

変更フィード プロセッサをホストする場所

変更フィード プロセッサは、長時間実行されるプロセスまたはタスクをサポートする任意のプラットフォームでホストできます。 次に例をいくつか示します。

変更フィード プロセッサは、その状態がリース コンテナーによって維持されるため、有効期間が短い環境で実行できますが、これらの環境の開始サイクルにより、通知の受信に遅延が加わります (環境が開始されるたびにプロセッサを開始するオーバーヘッドが生じるため)。

ロールベースのアクセスの要件

認証メカニズムとして Microsoft Entra ID を使用する場合は、ID に適切なアクセス許可があることを確認します。

  • 監視対象コンテナー上で:
    • Microsoft.DocumentDB/databaseAccounts/readMetadata
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
  • リース コンテナー上で:
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery

その他のリソース

次のステップ

以下の記事で、変更フィード プロセッサの詳細をご覧ください。