次の方法で共有


Java 用のAzure Event Hubs チェックポイント ストア クライアント ライブラリ - バージョン 1.17.0

ストレージ BLOB の使用

Azure Event Hubsチェックポイント ストアは、Azure Event Hubsからのイベントの処理中にチェックポイントを格納するために使用できます。 このパッケージでは、チェックポイントとパーティションの所有権情報を維持するための永続的なストアとしてストレージ BLOB を使用します。 BlobCheckpointStoreこのパッケージで提供される は、 にEventProcessor接続できます。

ソース コード | API リファレンス ドキュメント | 製品ドキュメント | サンプル

作業の開始

前提条件

パッケージを組み込む

BOM ファイルを含める

ライブラリの一般提供 (GA) バージョンに依存するには、azure-sdk-bom をプロジェクトに含めてください。 次のスニペットでは、{bom_version_to_target} プレースホルダーをバージョン番号に置き換えます。 BOM の詳細については、 AZURE SDK BOM README に関するページを参照してください。

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-sdk-bom</artifactId>
            <version>{bom_version_to_target}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

次に、次に示すように、バージョン タグを使用せずに、依存関係セクションに直接依存関係を含めます。

<dependencies>
  <dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
  </dependency>
</dependencies>

直接依存関係を含める

BOM に存在しない特定のバージョンのライブラリに依存する場合は、次のように直接依存関係をプロジェクトに追加します。

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
    <version>1.17.0</version>
</dependency>

ストレージ コンテナー クライアントを認証する

BlobCheckpointStoreContainerAsyncClientインスタンスを作成するには、最初に書き込みアクセスと接続文字列を含む適切な SAS トークンを使用して を作成する必要があります。 これを可能にするには、ストレージ アカウントのアカウント SAS (共有アクセス署名) 文字列が必要です。 詳細については、 SAS トークンに関するページを参照してください。

主要な概念

チェックポイント機能

"チェックポイント処理" とは、リーダーがパーティションにおけるイベント シーケンス内の位置をマークまたはコミットするために使用する処理です。 チェックポイント処理はコンシューマーの責任で行います。この処理はコンシューマー グループ内でパーティションごとに発生します。 つまり、コンシューマー グループごとに、各パーティション リーダーは、イベント ストリーム内でのその現在の位置を追跡する必要があり、データ ストリームが完了したと見なしたときにサービスに通知することができます。 リーダーがパーティションから切断し、その後再び接続すると、該当するコンシューマー グループ内の該当するパーティションの最後のリーダーによって最後に送信されたチェックポイントから読み取りが開始されます。 リーダーは接続の際に、このオフセットをイベント ハブに渡して、読み取りを開始する場所を指定します。 このように、チェックポイント処理を使用することで、ダウンストリーム アプリケーションごとにイベントに "完了" のマークを付けると共に、異なるコンピューター上で実行中のリーダー間でフェールオーバーが発生した場合に回復性をもたらすことができます。 このチェックポイント処理で、より小さなオフセットを指定すると、古いデータに戻ることができます。 このメカニズムにより、チェックポイント処理ではフェールオーバーの回復性とイベント ストリームの再生の両方を実現できます。

シーケンス番号の & オフセット

どちらのオフセット & シーケンス番号も、パーティション内のイベントの位置を参照します。 クライアント側のカーソルと考えることができます。 オフセットはイベントのバイト位置です。 オフセット/シーケンス番号を使用すると、イベント コンシューマー (リーダー) は、イベントの読み取りを開始するイベント ストリーム内のポイントを指定できます。 タイムスタンプを指定して、指定されたタイムスタンプの後にのみエンキューされたイベントを受け取ることができます。 コンシューマーは、Event Hubs サービスの外部に独自のオフセット値を格納する責任があります。 パーティション内では、各イベントにはオフセット、シーケンス番号、エンキューされた日時のタイムスタンプが含まれます。

SAS トークンを使用して Storage コンテナーのインスタンスを作成する

BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
    .connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
    .containerName("<CONTAINER_NAME>")
    .sasToken("<SAS_TOKEN>")
    .buildAsyncClient();

イベント プロセッサ クライアントを使用してイベントを使用する

Event Hub のすべてのパーティションのイベントを使用するには、特定のコンシューマー グループの を EventProcessorClient 作成します。 イベント ハブが作成されると、開始に使用できる既定のコンシューマー グループが提供されます。

EventProcessorClient 、イベントの処理を指定したコールバック関数に委任します。これにより、プロセッサが基になるコンシューマー操作を管理する責任を負う間、値を提供するために必要なロジックに集中できます。

この例では、 を構築 EventProcessorすることに焦点を BlobCheckpointStore当て、、および単純なコールバック関数を使用して、Event Hubs から受信したイベントを処理し、コンソールに書き込み、各イベントの後に Blob Storage のチェックポイントを更新します。

BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
    .connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
    .containerName("<CONTAINER_NAME>")
    .sasToken("<SAS_TOKEN>")
    .buildAsyncClient();

EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
    .consumerGroup("<< CONSUMER GROUP NAME >>")
    .connectionString("<< EVENT HUB CONNECTION STRING >>")
    .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
    .processEvent(eventContext -> {
        System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
            + "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
    })
    .processError(errorContext -> {
        System.out.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
    })
    .buildEventProcessorClient();

// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();

// (for demo purposes only - adding sleep to wait for receiving events)
TimeUnit.SECONDS.sleep(2);

// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();

トラブルシューティング

クライアントのログ記録を有効にする

Azure SDK for Java には、アプリケーション エラーのトラブルシューティングと解決の迅速化に役立つ一貫したログ記録のストーリーが用意されています。 生成されたログでは、最終状態に達する前のアプリケーションのフローがキャプチャされ、根本原因を特定するのに役立ちます。 ログ記録の有効化に関するガイダンスについては、ログ Wiki を参照してください。

既定の SSL ライブラリ

すべてのクライアント ライブラリは、Tomcat ネイティブの Boring SSL ライブラリを既定で使用して、SSL 操作にネイティブレベルのパフォーマンスを実現しています。 Boring SSL ライブラリは、Linux、macOS、Windows のネイティブ ライブラリを含んだ uber jar であり、JDK 内の既定の SSL 実装よりも優れたパフォーマンスを備えています。 依存関係のサイズを縮小する方法など、詳細については、Wiki の「パフォーマンス チューニング」セクションを参照してください。

次のステップ

サンプルについては、 こちらを参照してください

共同作成

このプロジェクトの積極的なコントリビューターになりたい場合は、投稿 ガイドライン を参照してください。

インプレッション数