Delen via


ServiceBusSessionReceiverAsyncClient Class

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient

Implements

public final class ServiceBusSessionReceiverAsyncClient
implements AutoCloseable

This asynchronous session receiver client is used to acquire session locks from a queue or topic and create ServiceBusReceiverAsyncClient instances that are tied to the locked sessions. Sessions can be used as a first in, first out (FIFO) processing of messages. Queues and topics/subscriptions support Service Bus sessions, however, it must be enabled at the time of entity creation.

The examples shown in this document use a credential object named DefaultAzureCredential for authentication, which is appropriate for most scenarios, including local development and production environments. Additionally, we recommend using managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation".

Sample: Receive messages from a specific session

Use acceptSession(String sessionId) to acquire the lock of a session if you know the session id. PEEK_LOCK and disableAutoComplete() are strongly recommended so users have control over message settlement.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
 // successfully locked.
 // `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
 // operations complete.
 // `Mono.usingWhen` can also be used if the resource closure returns a single item.
 Flux<Void> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptSession("<<my-session-id>>"),
     receiver -> {
         // Receive messages from <<my-session-id>> session.
         return receiver.receiveMessages().flatMap(message -> {
             System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
                 message.getBody());

             // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
             if (isMessageProcessed) {
                 return receiver.complete(message);
             } else {
                 return receiver.abandon(message);
             }
         });
     },
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of resources.
         receiver.close();
         sessionReceiver.close();
     }));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     unused -> {
     }, error -> System.err.print("Error receiving message from session: " + error),
     () -> System.out.println("Completed receiving from session."));

Sample: Receive messages from the first available session

Use acceptNextSession() to acquire the lock of the next available session without specifying the session id.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Creates a client to receive messages from the first available session. It waits until
 // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
 // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
 Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();

 Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
     receiver -> receiver.receiveMessages().flatMap(message -> {
         System.out.println("Received message: " + message.getBody());

         // Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return receiver.complete(message);
         } else {
             return receiver.abandon(message);
         }
     }),
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of the receiver and sessionReceiver when done receiving messages.
         receiver.close();
         sessionReceiver.close();
     }));

 // This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
 // operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
 // receiving messages.
 Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
 }, error -> System.out.println("Error occurred: " + error),
     () -> System.out.println("Receiving complete."));

Method Summary

Modifier and Type Method and Description
Mono<ServiceBusReceiverAsyncClient> acceptNextSession()

Acquires a session lock for the next available session and creates a ServiceBusReceiverAsyncClient to receive messages from the session.

Mono<ServiceBusReceiverAsyncClient> acceptSession(String sessionId)

Acquires a session lock for sessionId and create a ServiceBusReceiverAsyncClient to receive messages from the session.

void close()

Methods inherited from java.lang.Object

Method Details

acceptNextSession

public Mono acceptNextSession()

Acquires a session lock for the next available session and creates a ServiceBusReceiverAsyncClient to receive messages from the session. It will wait until a session is available if none is immediately available.

Returns:

A ServiceBusReceiverAsyncClient that is tied to the available session.

acceptSession

public Mono acceptSession(String sessionId)

Acquires a session lock for sessionId and create a ServiceBusReceiverAsyncClient to receive messages from the session. If the session is already locked by another client, an AmqpException is thrown.

Parameters:

sessionId - The session id.

Returns:

A ServiceBusReceiverAsyncClient that is tied to the specified session.

close

public void close()

Applies to