処理アプリケーションをスケーリングする
イベント処理アプリケーションをスケーリングするには、アプリケーションのインスタンスを複数実行し、それらのインスタンス間で負荷のバランスを取ります。 以前のバージョンでは、EventProcessorHost を使用することで、プログラムの複数のインスタンス間での負荷と、受信時のチェックポイントイ ベントのバランスを取ることができました。 新しいバージョン (5.0 以降) では EventProcessorClient (.NET および Java) または EventHubConsumerClient (Python および JavaScript) を使用して、同じ処理を実行できます。
Note
Event Hubs をスケーリングするための鍵となるのは、パーティション分割されたコンシューマーのアイデアです。 競合コンシューマー パターンとは対照的に、パーティション分割されたコンシューマー パターンは、競合のボトルネックを除去し、エンド ツー エンドの並列処理を容易にすることによって、高スケールを可能にします。
サンプル シナリオ
シナリオの例として、10 万件の家を監視するホーム セキュリティ企業を考えてみましょう。 この会社では、各家庭に設置された動体検知器、ドアや窓の開閉センサー、ガラス破損検知器などのさまざまなセンサーから常にデータを取得しています。 この会社では、住民がほぼリアルタイムで自宅の様子を監視できる Web サイトを開設しています。
各センサーにより、データがイベント ハブにプッシュされます。 イベント ハブは、16 個のパーティションで構成されます。 コンシューマー側では、これらのイベントを読み取って統合し、集計をストレージ BLOB にダンプするメカニズムが必要です。これは、ユーザー フレンドリな Web ページに投影されます。
分散環境でコンシューマーを設計する場合、シナリオで次の要件を処理する必要があります。
- スケール: 複数のコンシューマーを作成します。それぞれのコンシューマーは、いくつかの Event Hubs のパーティションからの読み取りの所有権を保持します。
- 負荷分散: コンシューマーを動的に増減します。 たとえば、新しいセンサーの種類 (たとえば、一酸化炭素検知器) が各家庭に追加されると、イベントの数が増加します。 その場合は、オペレーター (人間) がコンシューマー インスタンスの数を増やします。 すると、コンシューマーのプールにより、それ自体が所有するパーティションの数を再調整して、新しく追加されたコンシューマーと負荷を共有することができます。
- 失敗時のシームレスな再開: ホストとなっている仮想マシンが突然クラッシュしたなどの理由でコンシューマー (コンシューマー A) が失敗した場合、コンシューマー A が所有しているパーティションを他のコンシューマーが選択して続行できます。 また、"チェックポイント" または "オフセット" と呼ばれる継続ポイントは、コンシューマー A が失敗した正確なポイントであるか、その少し前のポイントである必要があります。
- イベントの使用: 前の 3 つのポイントはコンシューマーの管理を扱っていますが、イベントを使用して実用的な操作を行うには、コードが必要になります。 たとえば、イベントを集計し、BLOB ストレージにアップロードするなどです。
イベント プロセッサまたはコンシューマー クライアント
これらの要件を満たすために独自のソリューションを構築する必要はありません。 この機能は、Azure Event Hubs SDK によって提供されます。 .NET または Java SDK では、イベント プロセッサ クライアント (EventProcessorClient
) を使用し、Python と JavaScript SDK で EventHubConsumerClient
を使用します。
ほとんどの運用シナリオでは、イベントの読み取りと処理にイベント プロセッサ クライアントを使用することをお勧めします。 イベント プロセッサ クライアントは、特定のイベント ハブ用にコンシューマー グループのコンテキスト内で協調的に動作できます。 クライアントは、インスタンスがそのグループに対して使用可能または使用不可能になると、自動的に作業の配布と分散を管理します。
パーティションの所有権の追跡
通常、イベント プロセッサ インスタンスは、1 つまたは複数のパーティションからのイベントを所有および処理します。 パーティションの所有権は、イベント ハブとコンシューマー グループの組み合わせに関連付けられているすべてのアクティブなイベント プロセッサ インスタンス間で均等に分散されます。
各イベント プロセッサには一意識別子が与えられ、チェックポイント ストアのエントリを追加または更新することで、パーティションの所有権を要求します。 すべてのイベント プロセッサ インスタンスによって、このストアとの定期的な通信が行われ、自身の処理状態が更新されるとともに、他のアクティブなインスタンスについての学習が行われます。 このデータは、アクティブなプロセッサ間で負荷を分散するために使用されます。
メッセージを受信する
イベント プロセッサを作成する場合、イベントとエラーを処理する関数を指定します。 イベントを処理する関数を呼び出すたびに、特定のパーティションから 1 つのイベントが配信されます。 このイベントの処理はユーザーが行う必要があります。 コンシューマーによってすべてのメッセージが 1 回以上処理されることを確認する場合は、再試行ロジックを含む独自のコードを作成する必要があります。 ただし、有害メッセージについて注意してください。
これは迅速に済ませることをお勧めします。 つまり、できる限り最小限の処理に留めます。 ストレージへの書き込みとルーティングを行う必要がある場合、2 つのコンシューマー グループを使用して 2 つのイベント プロセッサを所有することをお勧めします。
チェックポイント機能
チェックポイント処理とは、イベント プロセッサがパーティション内の最後に正常に処理されたイベントの位置をマークまたはコミットするために使用する処理です。 通常、チェックポイントのマーク付けはイベントを処理する関数内で実行され、コンシューマー グループ内のパーティションごとに発生します。
イベント プロセッサがパーティションから切断されると、別のインスタンスが、そのコンシューマー グループ内のそのパーティションの最後のプロセッサによって以前にコミットされたチェックポイントからパーティションの処理を再開できます。 プロセッサは接続の際に、このオフセットをイベント ハブに渡して、読み取りを開始する場所を指定します。 このように、チェックポイント処理を使用することで、ダウンストリーム アプリケーションごとにイベントに "完了" のマークを付けると共に、イベント プロセッサがダウンしたときに回復性をもたらすことができます。 このチェックポイント処理で、より小さなオフセットを指定すると、古いデータに戻ることができます。
スレッドの安全性とプロセッサのインスタンス
既定では、イベントを処理する関数は、特定のパーティションに対して順番に呼び出されます。 後続のイベントと同じパーティションからのこの関数に対する呼び出しは、メッセージ ポンプが他のスレッドのバックグラウンドで引き続き実行されるため、バックグラウンドでキューに配置されます。 異なるパーティションからのイベントは同時に処理できるため、パーティションをまたがってアクセスされる共有状態は同期される必要があります。