EventProcessorClientBuilder クラス
- java.
lang. Object - com.
azure. messaging. eventhubs. EventProcessorClientBuilder
- com.
実装
public class EventProcessorClientBuilder
implements TokenCredentialTrait<EventProcessorClientBuilder>, AzureNamedKeyCredentialTrait<EventProcessorClientBuilder>, ConnectionStringTrait<EventProcessorClientBuilder>, AzureSasCredentialTrait<EventProcessorClientBuilder>, AmqpTrait<EventProcessorClientBuilder>, ConfigurationTrait<EventProcessorClientBuilder>
このクラスは、 の構成とインスタンス化に役立つ fluent builder API を提供します EventProcessorClient。 を呼び出すと buildEventProcessorClient() 、 の新しいインスタンスが EventProcessorClient構築されます。
の EventProcessorClientインスタンスを作成するには、 次のフィールドが必要です。
CheckpointStore - チェックポイントとパーティションの所有権情報を格納して、負荷分散とチェックポイント処理されたイベントを有効にする CheckpointStore の実装。
processEvent(Consumer<EventContext> processEvent) または processEventBatch(Consumer<EventBatchContext> processEventBatch, int maxBatchSize, Duration maxWaitTime) - イベント ハブから受信したイベントを処理するコールバック。
processError(Consumer<ErrorContext> processError) - EventProcessorClient の実行中に発生する可能性があるエラーを処理するコールバック。
Azure Event Hubsに対して操作を実行する資格情報。 これらは、次のいずれかの方法を使用して設定できます。
- connectionString(String connectionString) 特定のイベント ハブへの接続文字列を使用します。
- connectionString(String connectionString, String eventHubName) イベント ハブ 名前空間 の接続文字列とイベント ハブ名を指定します。
- credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential) には、完全修飾名前空間、イベント ハブ名、およびイベント ハブを使用する権限を持つ資格情報のセットが含まれます。
- credential(TokenCredential credential)、credential(AzureSasCredential credential)、または credential(AzureNamedKeyCredential credential) および とeventHubName(String eventHubName)共fullyQualifiedNamespace(String fullyQualifiedNamespace)に。 Event Hub を使用するための完全修飾名前空間、イベント ハブ名、および承認された資格情報。
このドキュメントに示す例では、認証に DefaultAzureCredential という名前の資格情報オブジェクトを使用します。これは、ローカルの開発環境や運用環境を含むほとんどのシナリオに適しています。 さらに、運用環境での認証に マネージド ID を 使用することをお勧めします。 認証のさまざまな方法とそれに対応する資格情報の種類の詳細については、 Azure ID のドキュメントを参照してください。
サンプル: を構築する EventProcessorClient
次のコード サンプルは、プロセッサ クライアントの作成を示しています。 プロセッサ クライアントは、複数の実行中のインスタンス間で負荷分散を行い、チェックポイント処理を実行でき、ネットワーク障害などの一時的な障害時に再接続できるため、運用シナリオに推奨されます。 次のサンプルではメモリCheckpointStore内を使用していますが、azure-messaging-eventhubs-checkpointstore-blob は、Azure Blob Storageによってサポートされるチェックポイント ストアを提供します。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.checkpointStore(new SampleCheckpointStore())
.processEvent(eventContext -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventContext.getPartitionContext().getPartitionId(),
eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.buildEventProcessorClient();
フィールドの概要
修飾子と型 | フィールドと説明 |
---|---|
static final Duration |
DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL
既定の負荷分散の更新間隔。 |
static final Duration |
DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL
既定の所有権の有効期限。 |
コンストラクターの概要
コンストラクター | 説明 |
---|---|
EventProcessorClientBuilder() |
EventProcessorClientBuilder の新しいインスタンスを作成します。 |
メソッドの概要
メソッドの継承元: java.lang.Object
フィールドの詳細
DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL
public static final Duration DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL
既定の負荷分散の更新間隔。 分散間隔は、クライアントとストレージ アカウントの間の待機時間を考慮する必要があります。
DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL
public static final Duration DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL
既定の所有権の有効期限。
コンストラクターの詳細
EventProcessorClientBuilder
public EventProcessorClientBuilder()
EventProcessorClientBuilder の新しいインスタンスを作成します。
メソッドの詳細
buildEventProcessorClient
public EventProcessorClient buildEventProcessorClient()
これにより、このビルダーで設定されたオプションで構成された新しい EventProcessorClient が作成されます。 このメソッドを呼び出すたびに、 の EventProcessorClient新しいインスタンスが返されます。
これにより EventProcessorClient 処理されるすべてのパーティションは、それぞれのパーティションで使用可能なイベントから earliest() 処理を開始します。
Returns:
checkpointStore
public EventProcessorClientBuilder checkpointStore(CheckpointStore checkpointStore)
パーティションのCheckpointStoreEventProcessorClient所有権とチェックポイント情報の格納に使用する を設定します。
ユーザーは、必要に応じて、所有権とチェックポイント情報を格納する 独自の CheckpointStore 実装を提供できます。
Parameters:
Returns:
clientOptions
public EventProcessorClientBuilder clientOptions(ClientOptions clientOptions)
プロセッサ クライアントのクライアント オプションを設定します。 クライアント オプションに設定されているアプリケーション ID は、トレースに使用されます。 に ClientOptions
設定されたヘッダーは現在使用されていませんが、AMQP メッセージに追加するために、以降のリリースで使用できます。
Parameters:
Returns:
configuration
public EventProcessorClientBuilder configuration(Configuration configuration)
サービス クライアントの構築中に使用される構成ストアを設定します。 指定しない場合は、既定の構成ストアを使用して を構成します EventHubAsyncClient。 構築中に構成設定を使用してバイパスするには、 を使用 NONE します。
Parameters:
Returns:
connectionString
public EventProcessorClientBuilder connectionString(String connectionString)
接続文字列を指定した資格情報を Event Hub インスタンスに設定します。
接続文字列が Event Hubs 名前空間からコピーされる場合、必要な目的の Event Hub への名前が含まれていない可能性があります。 この場合、接続文字列の末尾に "EntityPath=EVENT_HUB_NAME" を追加することで、名前を手動で追加できます。 たとえば、"EntityPath=telemetry-hub" などです。
共有アクセス ポリシーを Event Hub 自体に直接定義した場合、そのイベント ハブから接続文字列をコピーすると、名前を含む接続文字列が生成されます。
Parameters:
Returns:
connectionString
public EventProcessorClientBuilder connectionString(String connectionString, String eventHubName)
接続文字列に指定された資格情報を Event Hubs 名前空間に設定し、名前を特定の Event Hub インスタンスに設定します。
Parameters:
Returns:
consumerGroup
public EventProcessorClientBuilder consumerGroup(String consumerGroup)
イベントを使用するコンシューマー グループ名を EventProcessorClient 設定します。
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(AzureNamedKeyCredential credential)
接続する Event Hub インスタンスの資格情報と、それに対する承認方法を設定します。
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(AzureSasCredential credential)
接続する Event Hub インスタンスの資格情報と、それに対する承認方法を設定します。
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(TokenCredential credential)
サービスに TokenCredential 送信された要求を承認するために使用される を設定します。 型の適切な使用方法の詳細については、Azure SDK for Java の ID と認証 に関するドキュメントを TokenCredential 参照してください。
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, AzureNamedKeyCredential credential)
接続する Event Hub インスタンスの資格情報と、それに対する承認方法を設定します。
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, AzureSasCredential credential)
接続する Event Hub インスタンスの資格情報と、それに対する承認方法を設定します。
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential)
接続する Event Hub インスタンスの資格情報と、それに対する承認方法を設定します。
Parameters:
Returns:
customEndpointAddress
public EventProcessorClientBuilder customEndpointAddress(String customEndpointAddress)
Event Hubs サービスに接続するときのカスタム エンドポイント アドレスを設定します。 これは、ネットワークが標準のAzure Event Hubs エンドポイント アドレスへの接続は許可していないけれど、中継局経由での接続を許可している場合に便利です。 (例: https://my.custom.endpoint.com:55300)。
ポートが指定されていない場合は、 の既定の transportType(AmqpTransportType transport) ポートが使用されます。
Parameters:
Returns:
eventHubName
public EventProcessorClientBuilder eventHubName(String eventHubName)
クライアントを接続するイベント ハブの名前を設定します。
Parameters:
Returns:
fullyQualifiedNamespace
public EventProcessorClientBuilder fullyQualifiedNamespace(String fullyQualifiedNamespace)
Event Hubs 名前空間の完全修飾名を設定します。
Parameters:
Returns:
initialPartitionEventPosition
public EventProcessorClientBuilder initialPartitionEventPosition(Map
パーティションのチェックポイントが に存在しない場合に、各パーティションで使用するイベント位置を含むマップを CheckpointStore設定します。 このマップは、パーティション ID からキーが設定されます。
の構築時には、 のinitialPartitionEventPosition
オーバーロードを EventProcessorClient1 つだけ使用する必要があります。
Parameters:
Returns:
initialPartitionEventPosition
public EventProcessorClientBuilder initialPartitionEventPosition(Function
そのパーティションのチェックポイントが に存在しない場合は、各パーティションの既定の開始位置を設定します CheckpointStore。
の構築時には、 のinitialPartitionEventPosition
オーバーロードを EventProcessorClient1 つだけ使用する必要があります。
Parameters:
Returns:
loadBalancingStrategy
public EventProcessorClientBuilder loadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy)
EventProcessorClientはLoadBalancingStrategy、パーティションの所有権の要求に使用されます。 既定では、 BALANCED アプローチが使用されます。
Parameters:
Returns:
loadBalancingUpdateInterval
public EventProcessorClientBuilder loadBalancingUpdateInterval(Duration loadBalancingUpdateInterval)
負荷分散の更新サイクルから次のサイクルまでの期間。 これは通常、パーティションの所有権が更新される間隔でもあります。 既定では、この間隔は 10 秒に設定されています。
Parameters:
Returns:
partitionOwnershipExpirationInterval
public EventProcessorClientBuilder partitionOwnershipExpirationInterval(Duration partitionOwnershipExpirationInterval)
所有するプロセッサ インスタンスによって更新されない場合にパーティションの所有権が期限切れになるまでの時間。 これは、非アクティブなプロセッサによって以前に所有されていたパーティションの所有権を引き継ぐ前に、このプロセッサ インスタンスが待機する期間です。 既定では、この期間は 1 分に設定されます。
Parameters:
Returns:
prefetchCount
public EventProcessorClientBuilder prefetchCount(int prefetchCount)
受信操作が現在アクティブかどうかに関係なく、各コンシューマーがローカルでアクティブに受信およびキューに入れるイベントの数を制御するために受信側が使用する数を設定します。
Parameters:
Returns:
processError
public EventProcessorClientBuilder processError(Consumer
イベントの処理中にエラーが発生したときに呼び出される関数。 入力には、エラーが発生したパーティション情報が含まれています。
Parameters:
Returns:
processEvent
public EventProcessorClientBuilder processEvent(Consumer
この EventProcessorClientによって受信された各イベントに対して呼び出される関数。 入力には、パーティション コンテキストとイベント データが含まれます。
Parameters:
Returns:
processEvent
public EventProcessorClientBuilder processEvent(Consumer
この EventProcessorClientによって受信された各イベントに対して呼び出される関数。 入力には、パーティション コンテキストとイベント データが含まれます。 最大待機時間が設定されている場合、受信はその期間のイベントの受信を待機し、 がイベントを受信していない場合、コンシューマーは null イベント データを使用して呼び出されます。
Parameters:
Returns:
processEventBatch
public EventProcessorClientBuilder processEventBatch(Consumer
この EventProcessorClientによって受信された各イベントに対して呼び出される関数。 入力には、パーティション コンテキストとイベント データが含まれます。 最大待機時間が設定されている場合、受信はその期間のイベントの受信を待機し、 がイベントを受信していない場合、コンシューマーは null イベント データを使用して呼び出されます。
Parameters:
Returns:
processEventBatch
public EventProcessorClientBuilder processEventBatch(Consumer
この EventProcessorClientによって受信された各イベントに対して呼び出される関数。 入力には、パーティション コンテキストとイベント データが含まれます。 最大待機時間が設定されている場合、受信はその期間のイベントの受信を待機し、 がイベントを受信していない場合、コンシューマーは null イベント データを使用して呼び出されます。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.checkpointStore(new SampleCheckpointStore())
.processEventBatch(eventBatchContext -> {
eventBatchContext.getEvents().forEach(eventData -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventBatchContext.getPartitionContext().getPartitionId(),
eventData.getSequenceNumber());
});
}, 50, Duration.ofSeconds(30))
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.buildEventProcessorClient();
Parameters:
Returns:
processPartitionClose
public EventProcessorClientBuilder processPartitionClose(Consumer
パーティションの処理が停止したときに呼び出される関数。 入力には、パーティション情報と、このパーティションのイベント処理を停止する理由が含まれます。
Parameters:
Returns:
processPartitionInitialization
public EventProcessorClientBuilder processPartitionInitialization(Consumer
パーティションの処理が開始される前に呼び出される関数。 入力には、パーティション情報と、 でチェックポイントが使用できない CheckpointStore場合に使用されるイベントを処理するための既定の開始位置が含まれます。 別の開始位置が優先される場合、ユーザーはこの位置を更新できます。
Parameters:
Returns:
proxyOptions
public EventProcessorClientBuilder proxyOptions(ProxyOptions proxyOptions)
に使用するプロキシ構成を設定します EventHubAsyncClient。 プロキシが構成されている場合は、 AMQP_WEB_SOCKETS トランスポートの種類に を使用する必要があります。
Parameters:
Returns:
retry
@Deprecated
public EventProcessorClientBuilder retry(AmqpRetryOptions retryOptions)
非推奨
の再試行ポリシーを設定します EventHubAsyncClient。 指定しない場合は、既定の再試行オプションが使用されます。
Parameters:
Returns:
retryOptions
public EventProcessorClientBuilder retryOptions(AmqpRetryOptions retryOptions)
の再試行ポリシーを設定します EventHubAsyncClient。 指定しない場合は、既定の再試行オプションが使用されます。
Parameters:
Returns:
trackLastEnqueuedEventProperties
public EventProcessorClientBuilder trackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties)
イベント プロセッサが、関連付けられたパーティションで最後にエンキューされたイベントに関する情報を要求し、イベントの受信時にその情報を追跡するかどうかを設定します。
パーティションの最後のエンキュー イベントに関する情報が追跡されている場合、Event Hubs サービスから受信した各イベントは、それ以外の場合は行わないパーティションに関するメタデータを保持します。 これにより、Event Hub クライアントを使用してパーティション プロパティの要求を定期的に行うことを考慮すると、通常は有利なトレードオフとなる、追加のネットワーク帯域幅消費が少なくなります。
Parameters:
true
結果のイベントが、そのパーティションの最後のエンキューされた情報を追跡する場合。 false
それ以外の場合は 。
Returns:
transportType
public EventProcessorClientBuilder transportType(AmqpTransportType transport)
Azure Event Hubsとの通信がすべて行われるトランスポートの種類を設定します。 既定値は AMQP です。
Parameters:
Returns:
適用対象
Azure SDK for Java