変更フィード プロセッサ ライブラリから Azure Cosmos DB .NET V3 SDK に移行する
適用対象: NoSQL
この記事では、変更フィード プロセッサ ライブラリを使用する既存のアプリケーションのコードを、最新バージョンの .NET SDK (.NET V3 SDK とも呼ばれる) の変更フィード機能に移行するために必要な手順について説明します。
必要なコード変更
.NET V3 SDK には、いくつかの破壊的変更があります。ご利用のアプリケーションを移行する上で重要な手順を次に示します。
DocumentCollectionInfo
インスタンスを、監視されているコンテナーおよびリース コンテナーに対するContainer
参照に変換します。WithProcessorOptions
を使用しているカスタマイズについては、間隔にWithLeaseConfiguration
とWithPollInterval
を使用し、開始時刻にWithStartTime
を使用し、最大項目数を定義するためにWithMaxItems
を使用するように更新する必要があります。GetChangeFeedProcessorBuilder
上のprocessorName
についてはChangeFeedProcessorOptions.LeasePrefix
上で構成されている値と一致するように設定します。それ以外の場合はstring.Empty
を使用します。- 変更は
IReadOnlyList<Document>
として配信されなくなりました。代わりに、それはIReadOnlyCollection<T>
となります。ここで、T
は定義する必要のある型です。基本項目クラスはもうありません。 - 変更を処理する場合、
IChangeFeedObserver
の実装は不要になりました。代わりに、デリゲートを定義する必要があります。 デリゲートは静的関数とすることができます。実行中に状態を維持する必要がある場合は、独自のクラスを作成して、インスタンス メソッドをデリゲートとして渡すこともできます。
たとえば、変更フィード プロセッサを構築する元のコードが次のようであるとします。
ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.monitoredContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.leasesContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
.WithHostName("consoleHost")
.WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
{
StartFromBeginning = true,
LeasePrefix = "MyLeasePrefix",
MaxItemCount = 10,
FeedPollDelay = TimeSpan.FromSeconds(1)
})
.WithFeedCollection(monitoredCollectionInfo)
.WithLeaseCollection(leaseCollectionInfo)
.WithObserver<ChangeFeedObserver>()
.BuildAsync();
移行されたコードは次のようになります。
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithMaxItems(10)
.WithPollInterval(TimeSpan.FromSeconds(1))
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
デリゲートの場合は、静的メソッドを使用してイベントを受信できます。 IChangeFeedObserverContext
からの情報を使用していた場合は、ChangeFeedProcessorContext
を使用するように移行できます。
IChangeFeedObserverContext.PartitionKeyRangeId
の代わりにChangeFeedProcessorContext.LeaseToken
を使用できますIChangeFeedObserverContext.FeedResponse
の代わりにChangeFeedProcessorContext.Headers
を使用できますChangeFeedProcessorContext.Diagnostics
には、トラブルシューティングのために、要求の待機時間に関する詳細情報が含まれています
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate work
await Task.Delay(1);
}
}
正常性イベントと可観測性
以前に IHealthMonitor
を使用していた場合、または IChangeFeedObserver.OpenAsync
と IChangeFeedObserver.CloseAsync
を利用していた場合は、Notifications API を使用します。
IChangeFeedObserver.OpenAsync
はWithLeaseAcquireNotification
に置き換えることができます。IChangeFeedObserver.CloseAsync
はWithLeaseReleaseNotification
に置き換えることができます。IHealthMonitor.InspectAsync
はWithErrorNotification
に置き換えることができます。
静的なリース コンテナー
変更フィード プロセッサ ライブラリと同様に、.NET V3 SDK の変更フィード機能でもリース コンテナーを使用して状態が格納されます。 ただし、スキーマは異なります。
SDK V3 変更フィード プロセッサでは、移行されたアプリケーション コードの最初の実行時に、古いライブラリの状態が検出され、新しいスキーマに自動的に移行されます。
古いコードを使用するアプリケーションを安全に停止し、そのコードを新しいバージョンに移行し、移行されたアプリケーションを起動することができ、アプリケーションの停止中に発生した変更はいずれも新しいバージョンによって取得および処理されます。
その他のリソース
次のステップ
以下の記事で、変更フィード プロセッサに関してさらに詳しく知ることができます。
- 変更フィード プロセッサの概要
- 変更フィード推定機能の使用
- 変更フィード プロセッサの開始時刻
- Azure Cosmos DB への移行のための容量計画を実行しようとしていますか?
- 既存のデータベース クラスター内の仮想コアとサーバーの数のみがわかっている場合は、仮想コア数または仮想 CPU 数を使用した要求ユニットの見積もりに関するページを参照してください
- 現在のデータベース ワークロードに対する通常の要求レートがわかっている場合は、Azure Cosmos DB Capacity Planner を使用した要求ユニットの見積もりに関するページを参照してください