Redis 用の Jedis クライアント ライブラリを使用する
Azure Event Hubsチェックポイント ストアは、Azure Event Hubsからのイベントの処理中にチェックポイントを格納するために使用できます。
このパッケージでは、チェックポイントとパーティションの所有権情報を維持するための永続的なストアとして Redis を使用します。
JedisRedisCheckpointStore
このパッケージで提供される は、 にEventProcessorClient
接続できます。
ソース コード| API リファレンス ドキュメント | 製品ドキュメント | サンプル
作業の開始
前提条件
- Java Development Kit (JDK) バージョン 8 以降。
- Maven
- Microsoft Azure サブスクリプション
- 無料アカウントは、次の場合に作成できます。 https://azure.microsoft.com
- Azure Event Hubs インスタンス
- Azure Portal を使用してイベント ハブを作成するためのステップ バイ ステップ ガイド
- Azure Redis Cache または適切な代替 Redis サーバー
- Azure Portal を使用して Redis Cache を作成するためのステップ バイ ステップ ガイド
パッケージを組み込む
BOM ファイルを含める
ライブラリの一般提供 (GA) バージョンに依存するには、azure-sdk-bom をプロジェクトに含めてください。 次のスニペットでは、{bom_version_to_target} プレースホルダーをバージョン番号に置き換えます。 BOM の詳細については、 AZURE SDK BOM README に関するページを参照してください。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-sdk-bom</artifactId>
<version>{bom_version_to_target}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
次に、次に示すように、バージョン タグを使用せずに、依存関係セクションに直接依存関係を含めます。
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-jedis</artifactId>
</dependency>
</dependencies>
直接依存関係を含める
BOM に存在しない特定のバージョンのライブラリに依存する場合は、次のように直接依存関係をプロジェクトに追加します。
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-jedis</artifactId>
<version>1.0.0-beta.2</version>
</dependency>
ストレージ コンテナー クライアントを認証する
の JedisCheckpointStore
インスタンスを作成するには、 オブジェクトを JedisPool
作成する必要があります。 この JedisPool
オブジェクトを作成するには、ホスト名 String と主キー String が必要です。 これらは、次に示すように、オブジェクトを作成 JedisPool
するために使用できます。
主要な概念
主な概念については、ここで詳しく説明 します。
例
JedisPool のインスタンスを作成する
Azure Redis Cache を使用して JedisPool のインスタンスを作成するには、「Java でAzure Cache for Redisを使用してホスト名とアクセス キーをフェッチする」の手順に従います。 それ以外の場合は、実行中の Redis インスタンスからの接続情報を使用します。
JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
.password("<YOUR_REDIS_PRIMARY_ACCESS_KEY>")
.ssl(true)
.build();
String redisHostName = "<YOUR_REDIS_HOST_NAME>.redis.cache.windows.net";
HostAndPort hostAndPort = new HostAndPort(redisHostName, 6380);
JedisPool jedisPool = new JedisPool(hostAndPort, clientConfig);
// Do things with JedisPool.
// Finally, dispose of resource
jedisPool.close();
イベント プロセッサ クライアントを使用してイベントを使用する
Event Hub のすべてのパーティションのイベントを使用するには、特定のコンシューマー グループの を EventProcessorClient
作成します。 イベント ハブが作成されると、開始に使用できる既定のコンシューマー グループが提供されます。
は EventProcessorClient
、イベントの処理を指定したコールバック関数に委任します。これにより、プロセッサが基になるコンシューマー操作を管理する責任を負う間、値を提供するために必要なロジックに集中できます。
この例では、 を構築 EventProcessor
することに焦点を JedisRedisCheckpointStore
当て、、および単純なコールバック関数を使用して、Event Hubs から受信したイベントを処理し、コンソールに書き込み、各イベントの後に Blob Storage のチェックポイントを更新します。
JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
.password("<YOUR_REDIS_PRIMARY_ACCESS_KEY>")
.ssl(true)
.build();
String redisHostName = "<YOUR_REDIS_HOST_NAME>.redis.cache.windows.net";
HostAndPort hostAndPort = new HostAndPort(redisHostName, 6380);
JedisPool jedisPool = new JedisPool(hostAndPort, clientConfig);
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.connectionString("<< EVENT HUB NAMESPACE CONNECTION STRING >>")
.eventHubName("<< EVENT HUB NAME >>")
.checkpointStore(new JedisCheckpointStore(jedisPool))
.processEvent(eventContext -> {
System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
+ "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
})
.processError(context -> {
System.out.println("Error occurred while processing events " + context.getThrowable().getMessage());
})
.buildEventProcessorClient();
// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();
// (for demo purposes only - adding sleep to wait for receiving events)
// Your application will probably keep the eventProcessorClient alive until the program ends.
TimeUnit.SECONDS.sleep(2);
// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();
// Dispose of JedisPool resource.
jedisPool.close();
トラブルシューティング
クライアントのログ記録を有効にする
Azure SDK for Java には、アプリケーション エラーのトラブルシューティングと解決の迅速化に役立つ一貫したログ記録のストーリーが用意されています。 生成されたログでは、最終状態に達する前のアプリケーションのフローがキャプチャされ、根本原因を特定するのに役立ちます。 ログ記録の有効化に関するガイダンスについては、ログ Wiki を参照してください。
次のステップ
サンプルについては、 こちらを参照してください。
共同作成
このプロジェクトの積極的なコントリビューターになりたい場合は、投稿 ガイドライン を参照してください。
Azure SDK for Java