次の方法で共有


ServiceBusSessionReceiverAsyncClient クラス

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient

実装

public final class ServiceBusSessionReceiverAsyncClient
implements AutoCloseable

この 非同期 セッション レシーバー クライアントは、キューまたはトピックからセッション ロックを取得し、ロックされたセッションに関連付けられているインスタンスを作成 ServiceBusReceiverAsyncClient するために使用されます。 セッションは、メッセージの先入れ先出し (FIFO) 処理として使用できます。 キューとトピック/サブスクリプションは Service Bus セッションをサポートしていますが、 エンティティの作成時に有効にする必要があります。

このドキュメントに示す例では、認証に DefaultAzureCredential という名前の資格情報オブジェクトを使用します。これは、ローカルの開発環境や運用環境など、ほとんどのシナリオに適しています。 さらに、運用環境での認証には マネージド ID を 使用することをお勧めします。 さまざまな認証方法と、それに対応する資格情報の種類の詳細については、 Azure Identity のドキュメントを参照してください

サンプル: 特定のセッションからメッセージを受信する

セッション ID PEEK_LOCKdisableAutoComplete() がわかっている場合は、 を使用してacceptSession(String sessionId)セッションのロックを取得します。ユーザーがメッセージ決済を制御できるように強くお勧めします。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
 // successfully locked.
 // `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
 // operations complete.
 // `Mono.usingWhen` can also be used if the resource closure returns a single item.
 Flux<Void> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptSession("<<my-session-id>>"),
     receiver -> {
         // Receive messages from <<my-session-id>> session.
         return receiver.receiveMessages().flatMap(message -> {
             System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
                 message.getBody());

             // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
             if (isMessageProcessed) {
                 return receiver.complete(message);
             } else {
                 return receiver.abandon(message);
             }
         });
     },
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of resources.
         receiver.close();
         sessionReceiver.close();
     }));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     unused -> {
     }, error -> System.err.print("Error receiving message from session: " + error),
     () -> System.out.println("Completed receiving from session."));

サンプル: 使用可能な最初のセッションからメッセージを受信する

セッション ID を指定せずに、次に使用可能なセッションのロックを取得するには、 を使用 acceptNextSession() します。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Creates a client to receive messages from the first available session. It waits until
 // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
 // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
 Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();

 Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
     receiver -> receiver.receiveMessages().flatMap(message -> {
         System.out.println("Received message: " + message.getBody());

         // Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return receiver.complete(message);
         } else {
             return receiver.abandon(message);
         }
     }),
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of the receiver and sessionReceiver when done receiving messages.
         receiver.close();
         sessionReceiver.close();
     }));

 // This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
 // operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
 // receiving messages.
 Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
 }, error -> System.out.println("Error occurred: " + error),
     () -> System.out.println("Receiving complete."));

メソッドの概要

修飾子と型 メソッドと説明
Mono<ServiceBusReceiverAsyncClient> acceptNextSession()

次に使用可能なセッションのセッション ロックを取得し、セッションからメッセージを受信する を作成 ServiceBusReceiverAsyncClient します。

Mono<ServiceBusReceiverAsyncClient> acceptSession(String sessionId)

のセッション ロック sessionId を取得し、セッションからメッセージを受信する を ServiceBusReceiverAsyncClient 作成します。

void close()

メソッドの継承元: java.lang.Object

メソッドの詳細

acceptNextSession

public Mono acceptNextSession()

次に使用可能なセッションのセッション ロックを取得し、セッションからメッセージを受信する を作成 ServiceBusReceiverAsyncClient します。 すぐに利用できない場合は、セッションが使用可能になるまで待機します。

Returns:

ServiceBusReceiverAsyncClient使用可能なセッションに関連付けられている 。

acceptSession

public Mono acceptSession(String sessionId)

のセッション ロック sessionId を取得し、セッションからメッセージを受信する を ServiceBusReceiverAsyncClient 作成します。 セッションが既に別のクライアントによってロックされている場合は、 AmqpException がスローされます。

Parameters:

sessionId - セッション ID。

Returns:

ServiceBusReceiverAsyncClient指定したセッションに関連付けられている 。

close

public void close()

適用対象