ServiceBusReceiverAsyncClient クラス
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusReceiverAsyncClient
- com.
実装
public final class ServiceBusReceiverAsyncClient
implements AutoCloseable
Azure Service Bus キューまたはトピック/サブスクリプションからの受信を担当するServiceBusReceivedMessage非同期レシーバー。
このドキュメントに示す例では、認証に DefaultAzureCredential という名前の資格情報オブジェクトを使用します。これは、ローカルの開発環境や運用環境など、ほとんどのシナリオに適しています。 さらに、運用環境での認証には マネージド ID を 使用することをお勧めします。 さまざまな認証方法と、それに対応する資格情報の種類の詳細については、 Azure Identity のドキュメントを参照してください。
サンプル: を作成する ServiceBusReceiverAsyncClient
次のコード サンプルは、非同期クライアント ServiceBusReceiverAsyncClientの作成を示しています。 は fullyQualifiedNamespace
Service Bus 名前空間のホスト名です。 Azure Portal を介して Event Hubs 名前空間に移動した後、"Essentials" パネルの下に一覧表示されます。 使用される資格情報は、 DefaultAzureCredential
デプロイと開発でよく使用される資格情報を組み合わせ、実行環境に基づいて使用する資格情報を選択するためです。 PEEK_LOCK (既定の受信モード) と disableAutoComplete() は、ユーザーがメッセージの受け取りを制御できるように 強くお 勧めします。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.receiver()
.disableAutoComplete()
.queueName(queueName)
.buildAsyncClient();
// When users are done with the receiver, dispose of the receiver.
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
asyncReceiver.close();
サンプル: Service Bus リソースからすべてのメッセージを受信する
これにより、Service Bus からメッセージの無限ストリームが返されます。 ストリームは、サブスクリプションが破棄されるか、またはその他のターミナル シナリオで終了します。 詳細については、「 receiveMessages() 」を参照してください。
// Keep a reference to `subscription`. When the program is finished receiving messages, call
// subscription.dispose(). This will stop fetching messages from the Service Bus.
// Consider using Flux.usingWhen to scope the creation, usage, and cleanup of the receiver.
Disposable subscription = asyncReceiver.receiveMessages()
.flatMap(message -> {
System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
System.out.printf("Contents of message as string: %s%n", message.getBody());
// Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return asyncReceiver.complete(message);
} else {
return asyncReceiver.abandon(message);
}
})
.subscribe(unused -> {
}, error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
// When program ends, or you're done receiving all messages, dispose of the receiver.
// Clients should be long-lived objects as they
// require resources and time to establish a connection to the service.
asyncReceiver.close();
サンプル: Service Bus エンティティからモードで RECEIVE_AND_DELETE メッセージを受信する
次のコード サンプルは、 を使用した非同期クライアント ServiceBusReceiverAsyncClient の作成を RECEIVE_AND_DELETE示しています。 は fullyQualifiedNamespace
Service Bus 名前空間のホスト名です。 Azure Portal を介して Event Hubs 名前空間に移動した後、"Essentials" パネルの下に一覧表示されます。 使用される資格情報は、 DefaultAzureCredential
デプロイと開発でよく使用される資格情報を組み合わせ、実行環境に基づいて使用する資格情報を選択するためです。 RECEIVE_AND_DELETEこのモードを使用したメッセージの受信の詳細については、ドキュメントを参照してください。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// Keep a reference to `subscription`. When the program is finished receiving messages, call
// subscription.dispose(). This will stop fetching messages from the Service Bus.
Disposable subscription = Flux.usingWhen(
Mono.fromCallable(() -> {
// Setting the receiveMode when creating the receiver enables receive and delete mode. By default,
// peek lock mode is used. In peek lock mode, users are responsible for settling messages.
return new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.receiver()
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.queueName(queueName)
.buildAsyncClient();
}), receiver -> {
return receiver.receiveMessages();
}, receiver -> {
return Mono.fromRunnable(() -> receiver.close());
})
.subscribe(message -> {
// Messages received in RECEIVE_AND_DELETE mode do not have to be settled because they are automatically
// removed from the queue.
System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
System.out.printf("Contents of message as string: %s%n", message.getBody());
},
error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
サンプル: 特定のセッションからメッセージを受信する
特定のセッションからメッセージをフェッチするには、 に ServiceBusSessionReceiverClientBuilder 切り替えて、セッション レシーバー クライアントをビルドします。 を使用して acceptSession(String sessionId) 、セッションバインド ServiceBusReceiverAsyncClientを作成します。 このサンプルでは、 キューの作成時に Service Bus セッションが有効になっていることを前提としています。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
// successfully locked.
// `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
// operations complete.
// `Mono.usingWhen` can also be used if the resource closure returns a single item.
Flux<Void> sessionMessages = Flux.usingWhen(
sessionReceiver.acceptSession("<<my-session-id>>"),
receiver -> {
// Receive messages from <<my-session-id>> session.
return receiver.receiveMessages().flatMap(message -> {
System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
message.getBody());
// Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
});
},
receiver -> Mono.fromRunnable(() -> {
// Dispose of resources.
receiver.close();
sessionReceiver.close();
}));
// When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
// is non-blocking and kicks off the operation.
Disposable subscription = sessionMessages.subscribe(
unused -> {
}, error -> System.err.print("Error receiving message from session: " + error),
() -> System.out.println("Completed receiving from session."));
サンプル: 使用可能な最初のセッションからメッセージを受信する
最初に使用可能なセッションからのメッセージを処理するには、 に ServiceBusSessionReceiverClientBuilder 切り替えて、セッション レシーバー クライアントをビルドします。 を使用して acceptNextSession() 、メッセージを処理する最初の使用可能なセッションを見つけます。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// Creates a client to receive messages from the first available session. It waits until
// AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
// completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();
Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
receiver -> receiver.receiveMessages().flatMap(message -> {
System.out.println("Received message: " + message.getBody());
// Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
}),
receiver -> Mono.fromRunnable(() -> {
// Dispose of the receiver and sessionReceiver when done receiving messages.
receiver.close();
sessionReceiver.close();
}));
// This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
// operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
// receiving messages.
Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
}, error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
サンプル: Service Bus エンティティからのメッセージのレート制限の使用
特定の時刻に受信するメッセージの数を制限する必要があるメッセージ レシーバーの場合は、 を使用 BaseSubscriber#request(long)できます。
// This is a non-blocking call. The program will move to the next line of code after setting up the operation.
asyncReceiver.receiveMessages().subscribe(new BaseSubscriber<ServiceBusReceivedMessage>() {
private static final int NUMBER_OF_MESSAGES = 5;
private final AtomicInteger currentNumberOfMessages = new AtomicInteger();
@Override
protected void hookOnSubscribe(Subscription subscription) {
// Tell the Publisher we only want 5 message at a time.
request(NUMBER_OF_MESSAGES);
}
@Override
protected void hookOnNext(ServiceBusReceivedMessage message) {
// Process the ServiceBusReceivedMessage
// If the number of messages we have currently received is a multiple of 5, that means we have reached
// the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher
// that the subscriber is ready to get more messages from upstream.
if (currentNumberOfMessages.incrementAndGet() % 5 == 0) {
request(NUMBER_OF_MESSAGES);
}
}
});
メソッドの概要
メソッドの継承元: java.lang.Object
メソッドの詳細
abandon
public Mono
を ServiceBusReceivedMessage破棄します。 これにより、メッセージを再び処理できるようになります。 メッセージを破棄すると、メッセージの配信数が増えます。
Parameters:
Returns:
abandon
public Mono
を破棄すると、 ServiceBusReceivedMessage メッセージのプロパティが更新されます。 これにより、メッセージを再び処理できるようになります。 メッセージを破棄すると、メッセージの配信数が増えます。
Parameters:
Returns:
close
public void close()
サービスへの基になるリンクを閉じて、コンシューマーを破棄します。
commitTransaction
public Mono
トランザクションとそのトランザクションに関連付けられているすべての操作をコミットします。
トランザクションの作成と使用
// This mono creates a transaction and caches the output value, so we can associate operations with the
// transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
// the operation.
Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
.cache(value -> Duration.ofMillis(Long.MAX_VALUE),
error -> Duration.ZERO,
() -> Duration.ZERO);
// Dispose of the disposable to cancel the operation.
Disposable disposable = transactionContext.flatMap(transaction -> {
// Process messages and associate operations with the transaction.
Mono<Void> operations = Mono.when(
asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
// Finally, either commit or rollback the transaction once all the operations are associated with it.
return operations.then(asyncReceiver.commitTransaction(transaction));
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred processing transaction: " + error);
}, () -> {
System.out.println("Completed transaction");
});
Parameters:
Returns:
complete
public Mono
を完了します ServiceBusReceivedMessage。 これにより、サービスからメッセージが削除されます。
Parameters:
Returns:
complete
public Mono
指定したオプションを ServiceBusReceivedMessage 使用して を完了します。 これにより、サービスからメッセージが削除されます。
Parameters:
Returns:
createTransaction
public Mono
新しいサービス側トランザクションを開始します。 は ServiceBusTransactionContext 、このトランザクションに含める必要があるすべての操作に渡す必要があります。
トランザクションの作成と使用
// This mono creates a transaction and caches the output value, so we can associate operations with the
// transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
// the operation.
Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
.cache(value -> Duration.ofMillis(Long.MAX_VALUE),
error -> Duration.ZERO,
() -> Duration.ZERO);
// Dispose of the disposable to cancel the operation.
Disposable disposable = transactionContext.flatMap(transaction -> {
// Process messages and associate operations with the transaction.
Mono<Void> operations = Mono.when(
asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
// Finally, either commit or rollback the transaction once all the operations are associated with it.
return operations.then(asyncReceiver.commitTransaction(transaction));
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred processing transaction: " + error);
}, () -> {
System.out.println("Completed transaction");
});
Returns:
deadLetter
public Mono
ServiceBusReceivedMessageを配信不能サブキューに移動します。
Parameters:
Returns:
deadLetter
public Mono
ServiceBusReceivedMessage指定したオプションを使用して、 を配信不能サブキューに移動します。
Parameters:
Returns:
defer
public Mono
を延期します ServiceBusReceivedMessage。 これにより、メッセージが遅延サブキューに移動されます。
Parameters:
Returns:
defer
public Mono
オプションを ServiceBusReceivedMessage 設定して を延期します。 これにより、メッセージが遅延サブキューに移動されます。
Parameters:
Returns:
getEntityPath
public String getEntityPath()
このクライアントが操作する Service Bus リソースを取得します。
Returns:
getFullyQualifiedNamespace
public String getFullyQualifiedNamespace()
接続が関連付けられている完全修飾 Service Bus 名前空間を取得します。 これは、 と似ている可能性があります {yournamespace}.servicebus.windows.net
。
Returns:
getIdentifier
public String getIdentifier()
のインスタンス ServiceBusReceiverAsyncClientの識別子を取得します。
Returns:
getSessionId
public String getSessionId()
このレシーバーがセッション レシーバーの場合は、セッションの SessionId を取得します。
Returns:
getSessionState
public Mono
このレシーバーがセッション レシーバーの場合は、セッションの状態を取得します。
Returns:
peekMessage
public Mono
受信側またはメッセージ ソースの状態を変更せずに、次のアクティブなメッセージを読み取ります。 を最初に peek()
呼び出すと、この受信側の最初のアクティブ メッセージがフェッチされます。 後続の呼び出しのたびに、エンティティ内の後続のメッセージがフェッチされます。
Returns:
peekMessage
public Mono
指定したシーケンス番号から、受信側またはメッセージ ソースの状態を変更せずに、アクティブなメッセージの次を読み取ります。
Parameters:
Returns:
peekMessages
public Flux
受信側またはメッセージ ソースの状態を変更せずに、アクティブなメッセージの次のバッチを読み取ります。
Parameters:
Returns:
peekMessages
public Flux
指定したシーケンス番号から、受信側またはメッセージ・ソースの状態を変更せずに、アクティブ・メッセージの次のバッチを読み取ります。
Parameters:
Returns:
receiveDeferredMessage
public Mono
遅延 ServiceBusReceivedMessageを受け取ります。 遅延メッセージは、シーケンス番号を使用してのみ受信できます。
Parameters:
Returns:
sequenceNumber
を含む遅延メッセージ。receiveDeferredMessages
public Flux
遅延 ServiceBusReceivedMessageのバッチを受け取ります。 遅延メッセージは、シーケンス番号を使用してのみ受信できます。
Parameters:
Returns:
receiveMessages
public Flux
Service Bus エンティティから のServiceBusReceivedMessage無限ストリームを受信します。 この Flux は、次のいずれかになるまで、Service Bus エンティティからメッセージを継続的に受信します。
- 受信側が閉じています。
- Flux のサブスクリプションは破棄されます。
- ダウンストリーム サブスクライバーからのターミナル信号がアップストリームに伝達されます (つまり、 Flux#take(long)、Flux#take(Duration))。
- AmqpException受信リンクが停止する原因となる が発生します。
クライアントは、その下にある AMQP リンクを使用してメッセージを受信します。現在の AMQP リンクで再トリable エラーが発生した場合、クライアントは透過的に新しい AMQP リンクに移行します。 クライアントで再試行できないエラーが発生した場合、または再試行が終了すると、サブスクライバーの org.reactivestreams.Subscriber#onError(Throwable) ターミナル ハンドラーにこのエラーが通知されます。 ターミナル イベントの後に org.reactivestreams.Subscriber#onNext(Object) それ以上のメッセージは配信されません。アプリケーションは、受信を再開するために新しいクライアントを作成する必要があります。 古いクライアントの Flux を再サブスクライブしても効果はありません。
注: 再取得できないエラーのいくつかの例は、アプリケーションが存在しないキューへの接続を試みている、受信中にキューを削除または無効にする、ユーザーが Geo-DR を明示的に開始するなどです。 これらは、Service Bus がクライアントと通信して、再取得不可能なエラーが発生した特定のイベントです。
Returns:
renewMessageLock
public Mono
メッセージのロックを非同期的に更新します。 ロックは、エンティティで指定された設定に基づいて更新されます。 メッセージがモードで受信されると、メッセージはこの受信側インスタンスのサーバー上で PEEK_LOCK 、エンティティの作成中に指定された期間ロックされます (LockDuration)。 メッセージの処理にこの期間より長い時間が必要な場合は、ロックを更新する必要があります。 更新ごとに、ロックはエンティティの LockDuration 値にリセットされます。
Parameters:
Returns:
renewMessageLock
public Mono
の自動ロック更新を ServiceBusReceivedMessage開始します。
Parameters:
Returns:
maxLockRenewalDuration
する Mono。renewSessionLock
public Mono
このレシーバーがセッション レシーバーの場合は、セッション ロックを更新します。
Returns:
renewSessionLock
public Mono
このレシーバーが動作するセッションの自動ロック更新を開始します。
Parameters:
Returns:
rollbackTransaction
public Mono
指定されたトランザクションと、それに関連付けられているすべての操作をロールバックします。
トランザクションの作成と使用
// This mono creates a transaction and caches the output value, so we can associate operations with the
// transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
// the operation.
Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
.cache(value -> Duration.ofMillis(Long.MAX_VALUE),
error -> Duration.ZERO,
() -> Duration.ZERO);
// Dispose of the disposable to cancel the operation.
Disposable disposable = transactionContext.flatMap(transaction -> {
// Process messages and associate operations with the transaction.
Mono<Void> operations = Mono.when(
asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
// Finally, either commit or rollback the transaction once all the operations are associated with it.
return operations.then(asyncReceiver.commitTransaction(transaction));
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred processing transaction: " + error);
}, () -> {
System.out.println("Completed transaction");
});
Parameters:
Returns:
setSessionState
public Mono
受信側が動作するセッションの状態を設定します。
Parameters:
Returns:
適用対象
Azure SDK for Java