次の方法で共有


ストレージ BLOB を使用した Javascript 用のチェックポイント ストア ライブラリのAzure Event Hubs

チェックポイントを格納し、@azure/event-hubs ライブラリからを使用EventHubConsumerClientするときの負荷分散を支援する Azure Blob Storage ベースのソリューション

ソースコード | パッケージ (npm) | API リファレンス ドキュメント | サンプル

作業の開始

パッケージをインストールする

npm を使用してAzure Event Hubs チェックポイント ストア BLOB ライブラリをインストールする

npm install @azure/eventhubs-checkpointstore-blob

前提条件: Azure サブスクリプション、このパッケージを使用するための Event Hubs 名前空間、ストレージ アカウントが必要です

Node.js アプリケーションでこのパッケージを使用している場合は、Node.js 8.x 以降を使用します。

Typescript の構成

TypeScript ユーザーには、Node 型の定義がインストールされている必要があります。

npm install @types/node

また、tsconfig.json で を有効にする compilerOptions.allowSyntheticDefaultImports 必要があります。 が有効compilerOptions.esModuleInteropallowSyntheticDefaultImportsになっている場合は、既定で が有効になっていることに注意してください。 詳細については、「 TypeScript のコンパイラ オプション ハンドブック 」を参照してください。

主要な概念

  • スケール: 複数のコンシューマーを作成します。それぞれのコンシューマーは、いくつかの Event Hubs のパーティションからの読み取りの所有権を保持します。

  • 負荷分散:負荷分散をサポートするアプリケーションは、同じイベント ハブとコンシューマー グループと同じ からのイベントを使用するように構成されている の 1 つ以上のCheckpointStoreインスタンスEventHubConsumerClientで構成されています。 処理するパーティションを分散することで、異なるインスタンス間でワークロードのバランスを取ります。

  • チェックポイント: これは、リーダーがパーティション イベント シーケンス内の位置をマークまたはコミットするプロセスです。 チェックポイント処理はコンシューマーの責任で行います。この処理はコンシューマー グループ内でパーティションごとに発生します。 つまり、コンシューマー グループごとに、各パーティション リーダーは、イベント ストリーム内でのその現在の位置を追跡する必要があり、データ ストリームが完了したと見なしたときにサービスに通知することができます。

    リーダーがパーティションから切断し、その後再び接続すると、該当するコンシューマー グループ内の該当するパーティションの最後のリーダーによって最後に送信されたチェックポイントから読み取りが開始されます。 リーダーは接続の際に、このオフセットをイベント ハブに渡して、読み取りを開始する場所を指定します。 このように、チェックポイント処理を使用することで、ダウンストリーム アプリケーションごとにイベントに "完了" のマークを付けると共に、異なるコンピューター上で実行中のリーダー間でフェールオーバーが発生した場合に回復性をもたらすことができます。 このチェックポイント処理で、より小さなオフセットを指定すると、古いデータに戻ることができます。 このメカニズムにより、チェックポイント処理ではフェールオーバーの回復性とイベント ストリームの再生の両方を実現できます。

    BlobCheckpointStore は、読み込みと更新のチェックポイントのバランスを取るために EventHubConsumerClient に必要なキー メソッドを実装するクラスです。

Azure Blob Storageを使用して をCheckpointStore作成する

次のコード スニペットを使用して を作成します CheckpointStore。 ストレージ アカウントに接続文字列を指定する必要があります。

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

Azure Blob Storage を使用してイベントをチェックポイント処理する

Azure Blob Storageを使用して受信したイベントをチェックポイント処理するには、SubscriptionEventHandlers インターフェイスと互換性のあるオブジェクトを、メソッドを呼び出すupdateCheckpoint()コードと共に渡す必要があります。

この例では、 SubscriptionHandlersSubscriptionEventHandlers を 実装し、チェックポイント処理も処理します。

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

トラブルシューティング

ログの有効化

環境変数を AZURE_LOG_LEVEL 次のいずれかの値に設定して、 へのログ記録を stderr有効にすることができます。

  • verbose
  • info
  • warning
  • error

@azure/ロガー パッケージをインポートし、いずれかのログ レベル値を使用して関数をsetLogLevel呼び出すことで、ログ レベルをプログラムで設定することもできます。

ログ レベルをプログラムまたは環境変数を使用して AZURE_LOG_LEVEL 設定すると、選択したログ レベル以下のログ レベルを使用して書き込まれたログが出力されます。 たとえば、ログ レベルを に info設定すると、レベル warning に対して書き込まれ、 error 出力されるログも出力されます。 この SDK は、どのレベルにログを記録するかを決定する際に、TypeScript 用の Azure SDK ガイドライン に従います。

または、このライブラリを使用するときにログを DEBUG 取得するように環境変数を設定することもできます。 これは、依存関係rhea-promiserheaからログを出力する場合にも役立ちます。

メモ: AZURE_LOG_LEVEL設定されている場合は、DEBUG よりも優先されます。 AZURE_LOG_LEVELを指定 azure する場合や setLogLevel を呼び出す場合は、DEBUG を使用してライブラリを指定しないでください。

このライブラリの使用時にデバッグ ログを表示するには、次の環境変数を取得します。

  • Eventhubs Checkpointstore BLOB から情報レベルのデバッグ ログのみを取得します。
export DEBUG=azure:eventhubs-checkpointstore-blob:info

ファイルへのログ記録

  • 上記のようにログ記録を有効にし、次のようにテスト スクリプトを実行します。

    • テスト スクリプトのログ ステートメントは に out.log 移動し、sdk のログ ステートメントは に移動します debug.log

      node your-test-script.js > out.log 2>debug.log
      
    • テスト スクリプトと sdk からのログ ステートメントは、stderr を stdout (&1) にリダイレクトして同じファイル out.log に移動し、stdout をファイルにリダイレクトします。

      node your-test-script.js >out.log 2>&1
      
    • テスト スクリプトと sdk からのログ ステートメントは、同じファイル out.logに移動します。

      node your-test-script.js &> out.log
      

次の手順

詳細な例については、 サンプル ディレクトリを参照してください。

共同作成

このライブラリに投稿する場合、コードをビルドしてテストする方法の詳細については、投稿ガイドを参照してください。

インプレッション数