.NET 用Azure Event Hubs イベント プロセッサ クライアント ライブラリ - バージョン 5.9.3
Azure Event Hubsは、1 秒あたり何百万ものイベントを取り込み、複数のコンシューマーにストリーミングできる、拡張性の高い発行サブスクライブ サービスです。 これにより、接続されたデバイスとアプリケーションによって生成される大量のデータを処理して分析できます。 Event Hubs がデータを収集したら、リアルタイム分析プロバイダーを使用するか、バッチ処理/ストレージ アダプターを使用してデータを取得、変換、格納できます。 Azure Event Hubsの詳細については、「Event Hubs とは」を参照してください。
イベント プロセッサ クライアント ライブラリは、Azure Event Hubs クライアント ライブラリに付属しており、運用環境のシナリオの大部分に適した堅牢で耐久性のあるスケーラブルな方法でイベントを使用するためのスタンドアロン クライアントを提供します。 Azure Storage BLOB を使用して構築された推奨される実装では、次の場合にイベント プロセッサを使用することをお勧めします。
一時的な障害や断続的なネットワークの問題に対する回復性を備えた大規模なイベント ハブのすべてのパーティションにわたるイベントの読み取りと処理。
複数のプロセッサがコンシューマー グループのコンテキストで責任を動的に分散および共有するイベントを協調的に処理し、プロセッサの追加とグループからの削除に応じて負荷を適切に管理します。
基になるデータ ストアとして Azure Storage BLOB を使用して永続的な方法で処理するためのチェックポイントと状態を管理する。
ソースコード | パッケージ (NuGet) | API リファレンス ドキュメント | 製品ドキュメント | トラブルシューティング ガイド
作業の開始
前提条件
Azure サブスクリプション:Azure Event Hubsを含む Azure サービスを使用するには、サブスクリプションが必要です。 既存の Azure アカウントをお持ちでない場合は、無料試用版にサインアップするか、アカウントの作成時に Visual Studio サブスクリプション特典を使用できます。
Event Hubs 名前空間とイベント ハブ:Azure Event Hubsと対話するには、名前空間と Event Hub も使用できる必要があります。 Azure リソースの作成に慣れていない場合は、Azure portalを使用して Event Hub を作成するためのステップ バイ ステップ ガイドに従ってください。 ここでは、Azure CLI、Azure PowerShell、または Azure Resource Manager (ARM) テンプレートを使用してイベント ハブを作成するための詳細な手順も確認できます。
BLOB ストレージを使用する Azure Storage アカウント: チェックポイントを保持し、Azure Storage の所有権を管理するには、BLOB を使用できる Azure Storage アカウントが必要です。 プロセッサに使用される Azure Storage アカウントでは、論理的な削除と BLOB のバージョン管理が無効になっている必要があります。 Azure Storage アカウントに慣れていない場合は、Azure portalを使用してストレージ アカウントを作成するためのステップバイステップ ガイドに従ってください。 また、Azure CLI、Azure PowerShell、または Azure Resource Manager (ARM) テンプレートを使用してストレージ アカウントを作成するための詳細な手順も確認できます。
Azure Storage BLOB コンテナー: Azure Storage のチェックポイントと所有権のデータは、特定のコンテナー内の BLOB に書き込まれます。 には
EventProcessorClient
既存のコンテナーが必要であり、誤った構成を防ぐために暗黙的に作成されることはありません。 イベント ハブとコンシューマー グループの組み合わせごとに一意のコンテナーを使用することをお勧めします。 Azure Storage コンテナーに慣れていない場合は、 コンテナーの管理に関するドキュメントを参照してください。 ここでは、.NET、Azure CLI、またはAzure PowerShellを使用してコンテナーを作成するための詳細な手順を確認できます。C# 8.0:Azure Event Hubs クライアント ライブラリでは、C# 8.0 で導入された新機能が使用されます。 C# 8.0 構文を利用するには、 の言語バージョン
latest
で .NET Core SDK 3.0 以降を使用してコンパイルすることをお勧めします。C# 8.0 構文を最大限に活用したい Visual Studio ユーザーは、Visual Studio 2019 以降を使用する必要があります。 無料の Community エディションを含む Visual Studio 2019 は、
[ こちら](https://visualstudio.microsoft.com/vs/) からダウンロードできます。 Visual Studio 2017 のユーザーは、 Microsoft.Net.Compilers NuGet パッケージ を使用して言語バージョンを設定することで C# 8 構文を利用できます。ただし、編集エクスペリエンスは理想的ではない場合があります。以前の C# 言語バージョンでもライブラリを使用できますが、新しい構文の恩恵を受けるのではなく、非同期列挙可能メンバーと非同期破棄可能メンバーを手動で管理する必要があります。 以前のバージョンの .NET Core や .NET Framework を含め、.NET Core SDK でサポートされているフレームワーク バージョンは引き続き対象にすることができます。 詳細については、「 ターゲット フレームワークを指定する方法」を参照してください。
重要な注意:サンプルとサンプルを変更せずにビルドまたは実行するには、C# 11.0 を使用する必要があります。 他の言語バージョン用に調整する場合は、引き続きサンプルを実行できます。
Azure で必要なリソースをすばやく作成し、それらの接続文字列を受け取るために、次をクリックしてサンプル テンプレートをデプロイできます。
パッケージをインストールする
NuGet を使用して.NET 用Azure Event Hubs イベント プロセッサ クライアント ライブラリをインストールします。
dotnet add package Azure.Messaging.EventHubs.Processor
クライアントを認証する
Event Hubs 接続文字列を取得する
Event Hubs クライアント ライブラリが Event Hub と対話するには、それに接続して承認する方法を理解する必要があります。 これを行う最も簡単な方法は、Event Hubs 名前空間の作成時に自動的に作成される接続文字列を使用することです。 Event Hubs での接続文字列の使用に慣れていない場合は、ステップ バイ ステップ ガイドに従って Event Hubs 接続文字列を取得できます。
Azure Storage 接続文字列を取得する
イベント プロセッサ クライアントがチェックポイント処理に Azure Storage BLOB を使用するには、ストレージ アカウントに接続して承認する方法を理解する必要があります。 これを行う最も簡単な方法は、ストレージ アカウントの作成時に生成される接続文字列を使用することです。 Azure でのストレージ アカウント接続文字列の承認に慣れていない場合は、ステップ バイ ステップ ガイドに従って Azure Storage 接続文字列を構成できます。
接続文字列を取得したら、 それらを使用してプロセッサを作成する 方法の例については、「イベント プロセッサ クライアントの作成」を参照してください。
主要な概念
イベント プロセッサは、特定のコンシューマー グループのコンテキストで、特定のイベント ハブへの接続と各パーティションからのイベントの処理に関連する責任を管理することを目的としたコンストラクトです。 パーティションから読み取られたイベントを処理し、発生したエラーを処理する行為は、イベント プロセッサによって提供されたコードに委任され、プロセッサがイベントの読み取り、パーティションの管理、チェックポイントの形式での状態の永続化に関連するタスクを処理する間、ロジックがビジネス価値の提供に集中できるようになります。
チェックポイント処理 は、パーティションに対して処理されたイベントの位置をリーダーがマークして保持するプロセスです。 チェックポイント処理はコンシューマーの責任であり、パーティションごと (通常は特定のコンシューマー グループのコンテキスト) で発生します。
EventProcessorClient
の場合は、コンシューマー グループとパーティションの組み合わせの場合、プロセッサはイベント ストリーム内の現在の位置を追跡する必要があることを意味します。 詳細については、Event Hubs 製品ドキュメントの チェックポイントを 参照してください。イベント プロセッサが接続すると、そのコンシューマー グループ内のそのパーティションの最後のプロセッサによって以前に永続化されていたチェックポイントでイベントの読み取りが開始されます (存在する場合)。 イベント プロセッサは、パーティション内のイベントを読み取って処理する際に、ダウンストリーム アプリケーションによってイベントを "完了" としてマークし、イベント プロセッサまたはイベント プロセッサをホストする環境が失敗した場合に回復性を提供するチェックポイントを定期的に作成する必要があります。 必要に応じて、このチェックポイント処理プロセスを通じて以前のオフセットを指定することで、以前に "完了" としてマークされたイベントを再処理できます。
パーティションは、Event Hub に保持される順序付けされた一連のイベントです。 パーティションは、イベント コンシューマーに必要な並列処理に関連付けられたデータ編成の手段です。 Azure Event Hubs では、パーティション分割されたコンシューマー パターンを介してメッセージ ストリーミングを提供し、各コンシューマーがメッセージ ストリームの特定のサブセット (パーティション) のみを読み取ります。 新しいイベントが到着すると、このシーケンスの末尾に追加されます。 パーティションの数は、Event Hub の作成時に指定され、変更することはできません。
コンシューマー グループはは、Event Hub 全体のビューです。 コンシューマー グループを使用すると、複数のコンシューマー アプリケーションが個別のイベント ストリーム ビューを持つことができるようになり、それぞれの場所から独自のペースでストリームを個別に読み取ることができます。 コンシューマー グループあたり最大 5 つのリーダーをパーティションに同時に設定できますが、特定のパーティションとコンシューマー グループの組み合わせには、アクティブな 1 つのコンシューマーのみをお勧めします。 各アクティブ リーダーでは、そのパーティションからすべてのイベントを受け取ります。同じパーティションに複数のリーダーがある場合は、重複するイベントを受信します。
その他の概念と詳細な説明については、「 Event Hubs の機能」を参照してください。
クライアントの有効期間
EventProcessorClient
は、アプリケーションの有効期間中にシングルトンとしてキャッシュして使用しても安全です。これは、イベントが定期的に読み取られている場合のベスト プラクティスです。 クライアントは、ネットワーク、CPU、メモリの使用を効率的に管理し、非アクティブな期間中に使用率を低く保つ役割を担います。 ネットワーク リソースやその他のアンマネージド オブジェクトが適切にクリーンアップされるようにするには、プロセッサで または StopProcessing
を呼び出StopProcessingAsync
す必要があります。
スレッド セーフ
すべてのクライアント インスタンス メソッドがスレッド セーフであり、相互に独立していることを保証します (ガイドライン)。 これにより、クライアント インスタンスの再利用に関する推奨事項は、スレッド間でも常に安全になります。
や などのEventData
EventDataBatch
データ モデル型はスレッド セーフではありません。 スレッド間で共有したり、クライアント メソッドと同時に使用したりしないでください。
その他の概念
クライアント オプション | イベント ハンドラー | エラーの | 処理診断 | モック (プロセッサ) | モック作成 (クライアントの種類)
例
イベント プロセッサ クライアントの作成
EventProcessorClient
は、状態の永続化のために Azure Storage BLOB に依存しているため、プロセッサに BlobContainerClient
を指定する必要があります。これは、使用する必要があるストレージ アカウントとコンテナーに構成されています。 を構成するために使用するコンテナーが EventProcessorClient
存在する必要があります。
EventProcessorClient
には存在しないコンテナーを指定する意図を知る方法がないため、コンテナーは暗黙的に作成されません。 これは、誤って構成されたコンテナーに対するガードとして機能し、悪意のあるプロセッサが所有権を共有できず、コンシューマー グループ内の他のプロセッサと干渉します。
// The container specified when creating the BlobContainerClient must exist; it will
// not be implicitly created.
var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";
var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);
イベント ハンドラーとエラー ハンドラーを構成する
を使用するには、 EventProcessorClient
イベント処理とエラーのハンドラーを指定する必要があります。 これらのハンドラーは自己完結型と見なされ、開発者はハンドラー コード内の例外が確実に考慮されるようにする必要があります。
var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";
var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
async Task processEventHandler(ProcessEventArgs eventArgs)
{
try
{
// Perform the application-specific processing for an event. This method
// is intended for illustration and is not defined in this snippet.
await DoSomethingWithTheEvent(eventArgs.Partition, eventArgs.Data);
}
catch
{
// Handle the exception from handler code
}
}
async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
try
{
// Perform the application-specific processing for an error. This method
// is intended for illustration and is not defined in this snippet.
await DoSomethingWithTheError(eventArgs.Exception);
}
catch
{
// Handle the exception from handler code
}
}
var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);
processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;
処理の開始と停止
は EventProcessorClient
、明示的に開始されるとバックグラウンドで処理を実行し、明示的に停止されるまで処理を続行します。 これにより、アプリケーション コードは他のタスクを実行できるようになりますが、他のタスクが実行されていない場合は、処理中にプロセスが終了しないようにする責任も負います。
var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";
var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;
var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);
processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;
await processor.StartProcessingAsync();
try
{
// The processor performs its work in the background; block until cancellation
// to allow processing to take place.
await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
// This is expected when the delay is canceled.
}
try
{
await processor.StopProcessingAsync();
}
finally
{
// To prevent leaks, the handlers should be removed when processing is complete.
processor.ProcessEventAsync -= processEventHandler;
processor.ProcessErrorAsync -= processErrorHandler;
}
イベント プロセッサ クライアントでの Active Directory プリンシパルの使用
Azure Identity ライブラリは、Event Hubs や Azure Storage などの Azure クライアント ライブラリに使用できる Azure Active Directory 認証のサポートを提供します。
Active Directory プリンシパルを使用するために、Event Hubs クライアントの作成時に Azure.Identity
ライブラリから使用可能な資格情報の 1 つが指定されます。 さらに、Event Hubs の接続文字列の代わりに、完全修飾 Event Hubs 名前空間と目的のイベント ハブの名前が指定されます。
Azure Storage BLOB コンテナーで Active Directory プリンシパルを使用するには、ストレージ クライアントの作成時にコンテナーの完全修飾 URL を指定する必要があります。 Blob Storage にアクセスするための有効な URI 形式の詳細については、「 コンテナー、BLOB、メタデータの名前付けと参照」を参照してください。
var credential = new DefaultAzureCredential();
var blobStorageUrl ="<< FULLY-QUALIFIED CONTAINER URL (like https://myaccount.blob.core.windows.net/mycontainer) >>";
var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
var storageClient = new BlobContainerClient(new Uri(blobStorageUrl), credential);
var processor = new EventProcessorClient
(
storageClient,
consumerGroup,
fullyQualifiedNamespace,
eventHubName,
credential
);
Event Hubs で Azure Active Directory を使用する場合は、ロールなどの Event Hubs からの読み取りを許可するロールがプリンシパルに Azure Event Hubs Data Receiver
割り当てられている必要があります。 Event Hubs で Azure Active Directory 承認を使用する方法の詳細については、 関連するドキュメントを参照してください。
Azure Storage で Azure Active Directory を使用する場合は、ロールなどの BLOB への読み取り、書き込み、削除アクセスを許可するロールがプリンシパルに Storage Blob Data Contributor
割り当てられている必要があります。 Azure Storage での Active Directory 承認の使用の詳細については、 関連するドキュメント と Azure Storage 承認サンプルを参照してください。
トラブルシューティング
トラブルシューティングの詳細については、 Event Hubs トラブルシューティング ガイドを参照してください。
例外処理
イベント プロセッサ クライアントの例外
イベント プロセッサ クライアントは、例外が発生した場合に回復性を持つすべての試行を行い、不可能でない限り、処理を続行するために必要なアクションを実行します。 これを行うために開発者からのアクションは必要ありません。これはネイティブにプロセッサの動作の一部です。
開発者がイベント プロセッサ クライアント操作内で発生する例外を検査して対応できるようにするために、イベントを ProcessError
介して表示されます。 このイベントの引数は、例外とそれが観察されたコンテキストに関する詳細を提供します。 開発者は、エラーに応じてイベント プロセッサ クライアントを停止または再起動するなど、このイベント ハンドラー内から通常の操作を実行できますが、それ以外の場合はプロセッサの例外動作に影響を与えない場合があります。
エラー ハンドラーを実装する基本的な例については、サンプル「 イベント プロセッサ ハンドラー」を参照してください。
イベント ハンドラーの例外
イベント プロセッサ クライアントには、開発者が提供するイベント ハンドラー内の例外の重大度を理解するための適切なコンテキストがないため、適切な応答となるアクションを想定できません。 その結果、開発者は、ブロックやその他の標準言語コンストラクトを使用して提供するイベント ハンドラー内で発生する例外に対して try/catch
責任を負うことになります。
イベント プロセッサ クライアントは、開発者コードで例外を検出したり、明示的に表示したりしません。 結果の動作は、プロセッサのホスティング環境と、イベント ハンドラーが呼び出されたコンテキストによって異なります。 これはシナリオによって異なる可能性があるため、開発者はイベント ハンドラーを防御的にコーディングし、潜在的な例外を考慮することを強くお勧めします。
ログ記録と診断
イベント プロセッサ クライアント ライブラリは、.NET EventSource
を使用して情報を出力するために、さまざまな詳細レベルで情報をログに記録するために完全にインストルメント化されています。 ログ記録は操作ごとに実行され、操作の開始点、完了、および発生した例外をマークするパターンに従います。 分析情報を提供する可能性がある追加情報も、関連する操作のコンテキストに記録されます。
イベント プロセッサ クライアント ログは、"Azure-Messaging-EventHubs-Processor-EventProcessorClient" という名前のソースにオプトインするか、"AzureEventSource" という特性を持つすべてのソースをオプトインすることで、任意 EventListener
のユーザーが使用できます。 Azure クライアント ライブラリからのログのキャプチャを容易にするために、 Azure.Core
Event Hubs で使用されるライブラリには が AzureEventSourceListener
用意されています。 詳細については、「 AzureEventSourceListener を使用した Event Hubs ログのキャプチャ」を参照してください。
イベント プロセッサ ライブラリは、Application Insights または OpenTelemetry を使用した分散トレース用にもインストルメント化されています。 詳細については、 Azure.Core Diagnostics サンプルを参照してください。
次のステップ
説明したシナリオに加えて、Azure Event Hubs プロセッサ ライブラリでは、 の完全な機能セットを活用するのに役立つ追加シナリオのサポートがEventProcessorClient
提供されています。 これらのシナリオの一部を調べるのに役立つ目的で、Event Hubs Processor クライアント ライブラリには、一般的なシナリオの図として機能するサンプルのプロジェクトが用意されています。 詳細については、サンプル README を参照してください。
共同作成
このプロジェクトでは、共同作成と提案を歓迎しています。 ほとんどの共同作成では、共同作成者使用許諾契約書 (CLA) にご同意いただき、ご自身の共同作成内容を使用する権利を Microsoft に供与する権利をお持ちであり、かつ実際に供与することを宣言していただく必要があります。 詳細については、 https://cla.microsoft.com を参照してください。
pull request を送信すると、CLA を提供して PR (ラベル、コメントなど) を適宜装飾する必要があるかどうかを CLA ボットが自動的に決定します。 ボットによって提供される手順にそのまま従ってください。 この操作は、Microsoft の CLA を使用するすべてのリポジトリについて、1 回だけ行う必要があります。
このプロジェクトでは、Microsoft オープン ソースの倫理規定を採用しています。 詳しくは、「Code of Conduct FAQ (倫理規定についてよくある質問)」を参照するか、opencode@microsoft.com 宛てに質問またはコメントをお送りください。
詳細については、 投稿ガイド を参照してください。