ストレージ BLOB を使用した Javascript 用のチェックポイント ストア ライブラリのAzure Event Hubs
チェックポイントを格納し、@azure/event-hubs ライブラリからを使用EventHubConsumerClient
するときの負荷分散を支援する Azure Blob Storage ベースのソリューション
ソースコード | パッケージ (npm) | API リファレンス ドキュメント | サンプル
作業の開始
パッケージをインストールする
npm を使用してAzure Event Hubs チェックポイント ストア BLOB ライブラリをインストールする
npm install @azure/eventhubs-checkpointstore-blob
前提条件: Azure サブスクリプション、このパッケージを使用するための Event Hubs 名前空間、ストレージ アカウントが必要です
Node.js アプリケーションでこのパッケージを使用している場合は、Node.js 8.x 以降を使用します。
Typescript の構成
TypeScript ユーザーには、Node 型の定義がインストールされている必要があります。
npm install @types/node
また、tsconfig.json で を有効にする compilerOptions.allowSyntheticDefaultImports
必要があります。 が有効compilerOptions.esModuleInterop
allowSyntheticDefaultImports
になっている場合は、既定で が有効になっていることに注意してください。 詳細については、「 TypeScript のコンパイラ オプション ハンドブック 」を参照してください。
主要な概念
スケール: 複数のコンシューマーを作成します。それぞれのコンシューマーは、いくつかの Event Hubs のパーティションからの読み取りの所有権を保持します。
負荷分散:負荷分散をサポートするアプリケーションは、同じイベント ハブとコンシューマー グループと同じ からのイベントを使用するように構成されている の 1 つ以上の
CheckpointStore
インスタンスEventHubConsumerClient
で構成されています。 処理するパーティションを分散することで、異なるインスタンス間でワークロードのバランスを取ります。チェックポイント: これは、リーダーがパーティション イベント シーケンス内の位置をマークまたはコミットするプロセスです。 チェックポイント処理はコンシューマーの責任で行います。この処理はコンシューマー グループ内でパーティションごとに発生します。 つまり、コンシューマー グループごとに、各パーティション リーダーは、イベント ストリーム内でのその現在の位置を追跡する必要があり、データ ストリームが完了したと見なしたときにサービスに通知することができます。
リーダーがパーティションから切断し、その後再び接続すると、該当するコンシューマー グループ内の該当するパーティションの最後のリーダーによって最後に送信されたチェックポイントから読み取りが開始されます。 リーダーは接続の際に、このオフセットをイベント ハブに渡して、読み取りを開始する場所を指定します。 このように、チェックポイント処理を使用することで、ダウンストリーム アプリケーションごとにイベントに "完了" のマークを付けると共に、異なるコンピューター上で実行中のリーダー間でフェールオーバーが発生した場合に回復性をもたらすことができます。 このチェックポイント処理で、より小さなオフセットを指定すると、古いデータに戻ることができます。 このメカニズムにより、チェックポイント処理ではフェールオーバーの回復性とイベント ストリームの再生の両方を実現できます。
BlobCheckpointStore は、読み込みと更新のチェックポイントのバランスを取るために EventHubConsumerClient に必要なキー メソッドを実装するクラスです。
例
Azure Blob Storageを使用して をCheckpointStore
作成する
次のコード スニペットを使用して を作成します CheckpointStore
。 ストレージ アカウントに接続文字列を指定する必要があります。
import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!containerClient.exists()) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
Azure Blob Storage を使用してイベントをチェックポイント処理する
Azure Blob Storageを使用して受信したイベントをチェックポイント処理するには、SubscriptionEventHandlers インターフェイスと互換性のあるオブジェクトを、メソッドを呼び出すupdateCheckpoint()
コードと共に渡す必要があります。
この例では、 SubscriptionHandlers
SubscriptionEventHandlers を 実装し、チェックポイント処理も処理します。
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";
const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!(await containerClient.exists())) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
class SubscriptionHandlers {
async processEvents(event, context) {
// custom logic for processing events goes here
// Checkpointing will allow your service to restart and pick
// up from where it left off.
//
// You'll want to balance how often you checkpoint with the
// performance of your underlying checkpoint store.
await context.updateCheckpoint(event);
}
async processError(err, context) {
// handle any errors that occur during the course of
// this subscription
console.log(`Errors in subscription: ${err}`);
}
}
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);
const subscription = consumerClient.subscribe(new SubscriptionHandlers());
// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();
トラブルシューティング
ログの有効化
環境変数を AZURE_LOG_LEVEL
次のいずれかの値に設定して、 へのログ記録を stderr
有効にすることができます。
- verbose
- info
- warning
- error
@azure/ロガー パッケージをインポートし、いずれかのログ レベル値を使用して関数をsetLogLevel
呼び出すことで、ログ レベルをプログラムで設定することもできます。
ログ レベルをプログラムまたは環境変数を使用して AZURE_LOG_LEVEL
設定すると、選択したログ レベル以下のログ レベルを使用して書き込まれたログが出力されます。
たとえば、ログ レベルを に info
設定すると、レベル warning
に対して書き込まれ、 error
出力されるログも出力されます。
この SDK は、どのレベルにログを記録するかを決定する際に、TypeScript 用の Azure SDK ガイドライン に従います。
または、このライブラリを使用するときにログを DEBUG
取得するように環境変数を設定することもできます。
これは、依存関係rhea-promise
rhea
からログを出力する場合にも役立ちます。
メモ: AZURE_LOG_LEVEL設定されている場合は、DEBUG よりも優先されます。
AZURE_LOG_LEVELを指定 azure
する場合や setLogLevel を呼び出す場合は、DEBUG を使用してライブラリを指定しないでください。
このライブラリの使用時にデバッグ ログを表示するには、次の環境変数を取得します。
- Eventhubs Checkpointstore BLOB から情報レベルのデバッグ ログのみを取得します。
export DEBUG=azure:eventhubs-checkpointstore-blob:info
ファイルへのログ記録
上記のようにログ記録を有効にし、次のようにテスト スクリプトを実行します。
テスト スクリプトのログ ステートメントは に
out.log
移動し、sdk のログ ステートメントは に移動しますdebug.log
。node your-test-script.js > out.log 2>debug.log
テスト スクリプトと sdk からのログ ステートメントは、stderr を stdout (&1) にリダイレクトして同じファイル
out.log
に移動し、stdout をファイルにリダイレクトします。node your-test-script.js >out.log 2>&1
テスト スクリプトと sdk からのログ ステートメントは、同じファイル
out.log
に移動します。node your-test-script.js &> out.log
次の手順
詳細な例については、 サンプル ディレクトリを参照してください。
共同作成
このライブラリに投稿する場合、コードをビルドしてテストする方法の詳細については、投稿ガイドを参照してください。
Azure SDK for JavaScript