共用方式為


ServiceBusReceiverAsyncClient Class

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient

Implements

public final class ServiceBusReceiverAsyncClient
implements AutoCloseable

An asynchronous receiver responsible for receiving ServiceBusReceivedMessage from an Azure Service Bus queue or topic/subscription.

The examples shown in this document use a credential object named DefaultAzureCredential for authentication, which is appropriate for most scenarios, including local development and production environments. Additionally, we recommend using managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation".

Sample: Creating a ServiceBusReceiverAsyncClient

The following code sample demonstrates the creation of the asynchronous client ServiceBusReceiverAsyncClient. The fullyQualifiedNamespace is the Service Bus namespace's host name. It is listed under the "Essentials" panel after navigating to the Event Hubs Namespace via Azure Portal. The credential used is DefaultAzureCredential because it combines commonly used credentials in deployment and development and chooses the credential to used based on its running environment. PEEK_LOCK (the default receive mode) and disableAutoComplete() are strongly recommended so users have control over message settlement.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .receiver()
     .disableAutoComplete()
     .queueName(queueName)
     .buildAsyncClient();

 // When users are done with the receiver, dispose of the receiver.
 // Clients should be long-lived objects as they require resources
 // and time to establish a connection to the service.
 asyncReceiver.close();

Sample: Receive all messages from Service Bus resource

This returns an infinite stream of messages from Service Bus. The stream ends when the subscription is disposed or other terminal scenarios. See receiveMessages() for more information.

// Keep a reference to `subscription`. When the program is finished receiving messages, call
 // subscription.dispose(). This will stop fetching messages from the Service Bus.
 // Consider using Flux.usingWhen to scope the creation, usage, and cleanup of the receiver.
 Disposable subscription = asyncReceiver.receiveMessages()
     .flatMap(message -> {
         System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
         System.out.printf("Contents of message as string: %s%n", message.getBody());

         // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return asyncReceiver.complete(message);
         } else {
             return asyncReceiver.abandon(message);
         }
     })
     .subscribe(unused -> {
     }, error -> System.out.println("Error occurred: " + error),
         () -> System.out.println("Receiving complete."));

 // When program ends, or you're done receiving all messages, dispose of the receiver.
 // Clients should be long-lived objects as they
 // require resources and time to establish a connection to the service.
 asyncReceiver.close();

Sample: Receive messages in RECEIVE_AND_DELETE mode from a Service Bus entity

The following code sample demonstrates the creation of the asynchronous client ServiceBusReceiverAsyncClient using RECEIVE_AND_DELETE. The fullyQualifiedNamespace is the Service Bus namespace's host name. It is listed under the "Essentials" panel after navigating to the Event Hubs Namespace via Azure Portal. The credential used is DefaultAzureCredential because it combines commonly used credentials in deployment and development and chooses the credential to used based on its running environment. See RECEIVE_AND_DELETE docs for more information about receiving messages using this mode.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // Keep a reference to `subscription`. When the program is finished receiving messages, call
 // subscription.dispose(). This will stop fetching messages from the Service Bus.
 Disposable subscription = Flux.usingWhen(
         Mono.fromCallable(() -> {
             // Setting the receiveMode when creating the receiver enables receive and delete mode. By default,
             // peek lock mode is used. In peek lock mode, users are responsible for settling messages.
             return new ServiceBusClientBuilder()
                 .credential(fullyQualifiedNamespace, credential)
                 .receiver()
                 .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
                 .queueName(queueName)
                 .buildAsyncClient();
         }), receiver -> {
             return receiver.receiveMessages();
         }, receiver -> {
             return Mono.fromRunnable(() -> receiver.close());
         })
     .subscribe(message -> {
             // Messages received in RECEIVE_AND_DELETE mode do not have to be settled because they are automatically
             // removed from the queue.
         System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
         System.out.printf("Contents of message as string: %s%n", message.getBody());
     },
         error -> System.out.println("Error occurred: " + error),
         () -> System.out.println("Receiving complete."));

Sample: Receive messages from a specific session

To fetch messages from a specific session, switch to ServiceBusSessionReceiverClientBuilder and build the session receiver client. Use acceptSession(String sessionId) to create a session-bound ServiceBusReceiverAsyncClient. The sample assumes that Service Bus sessions were enabled at the time of the queue creation.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
 // successfully locked.
 // `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
 // operations complete.
 // `Mono.usingWhen` can also be used if the resource closure returns a single item.
 Flux<Void> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptSession("<<my-session-id>>"),
     receiver -> {
         // Receive messages from <<my-session-id>> session.
         return receiver.receiveMessages().flatMap(message -> {
             System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
                 message.getBody());

             // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
             if (isMessageProcessed) {
                 return receiver.complete(message);
             } else {
                 return receiver.abandon(message);
             }
         });
     },
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of resources.
         receiver.close();
         sessionReceiver.close();
     }));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     unused -> {
     }, error -> System.err.print("Error receiving message from session: " + error),
     () -> System.out.println("Completed receiving from session."));

Sample: Receive messages from the first available session

To process messages from the first available session, switch to ServiceBusSessionReceiverClientBuilder and build the session receiver client. Use acceptNextSession() to find the first available session to process messages from.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Creates a client to receive messages from the first available session. It waits until
 // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
 // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
 Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();

 Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
     receiver -> receiver.receiveMessages().flatMap(message -> {
         System.out.println("Received message: " + message.getBody());

         // Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return receiver.complete(message);
         } else {
             return receiver.abandon(message);
         }
     }),
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of the receiver and sessionReceiver when done receiving messages.
         receiver.close();
         sessionReceiver.close();
     }));

 // This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
 // operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
 // receiving messages.
 Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
 }, error -> System.out.println("Error occurred: " + error),
     () -> System.out.println("Receiving complete."));

Sample: Rate limiting consumption of messages from a Service Bus entity

For message receivers that need to limit the number of messages they receive at a given time, they can use BaseSubscriber#request(long).

// This is a non-blocking call. The program will move to the next line of code after setting up the operation.
 asyncReceiver.receiveMessages().subscribe(new BaseSubscriber<ServiceBusReceivedMessage>() {
     private static final int NUMBER_OF_MESSAGES = 5;
     private final AtomicInteger currentNumberOfMessages = new AtomicInteger();

     @Override
     protected void hookOnSubscribe(Subscription subscription) {
         // Tell the Publisher we only want 5 message at a time.
         request(NUMBER_OF_MESSAGES);
     }

     @Override
     protected void hookOnNext(ServiceBusReceivedMessage message) {
         // Process the ServiceBusReceivedMessage
         // If the number of messages we have currently received is a multiple of 5, that means we have reached
         // the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher
         // that the subscriber is ready to get more messages from upstream.
         if (currentNumberOfMessages.incrementAndGet() % 5 == 0) {
             request(NUMBER_OF_MESSAGES);
         }
     }
 });

Method Summary

Modifier and Type Method and Description
Mono<Void> abandon(ServiceBusReceivedMessage message)

Abandons a ServiceBusReceivedMessage.

Mono<Void> abandon(ServiceBusReceivedMessage message, AbandonOptions options)

Abandons a ServiceBusReceivedMessage updates the message's properties.

void close()

Disposes of the consumer by closing the underlying links to the service.

Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext)

Commits the transaction and all the operations associated with it.

Mono<Void> complete(ServiceBusReceivedMessage message)

Completes a ServiceBusReceivedMessage.

Mono<Void> complete(ServiceBusReceivedMessage message, CompleteOptions options)

Completes a ServiceBusReceivedMessage with the given options.

Mono<ServiceBusTransactionContext> createTransaction()

Starts a new service side transaction.

Mono<Void> deadLetter(ServiceBusReceivedMessage message)

Moves a ServiceBusReceivedMessage to the dead-letter sub-queue.

Mono<Void> deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

Moves a ServiceBusReceivedMessage to the dead-letter sub-queue with the given options.

Mono<Void> defer(ServiceBusReceivedMessage message)

Defers a ServiceBusReceivedMessage.

Mono<Void> defer(ServiceBusReceivedMessage message, DeferOptions options)

Defers a ServiceBusReceivedMessage with the options set.

String getEntityPath()

Gets the Service Bus resource this client interacts with.

String getFullyQualifiedNamespace()

Gets the fully qualified Service Bus namespace that the connection is associated with.

String getIdentifier()

Gets the identifier of the instance of ServiceBusReceiverAsyncClient.

String getSessionId()

Gets the SessionId of the session if this receiver is a session receiver.

Mono<byte[]> getSessionState()

Gets the state of the session if this receiver is a session receiver.

Mono<ServiceBusReceivedMessage> peekMessage()

Reads the next active message without changing the state of the receiver or the message source.

Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber)

Starting from the given sequence number, reads next the active message without changing the state of the receiver or the message source.

Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages)

Reads the next batch of active messages without changing the state of the receiver or the message source.

Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber)

Starting from the given sequence number, reads the next batch of active messages without changing the state of the receiver or the message source.

Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber)

Receives a deferred ServiceBusReceivedMessage.

Flux<ServiceBusReceivedMessage> receiveDeferredMessages(Iterable<Long> sequenceNumbers)

Receives a batch of deferred ServiceBusReceivedMessage.

Flux<ServiceBusReceivedMessage> receiveMessages()

Receives an infinite stream of ServiceBusReceivedMessage from the Service Bus entity.

Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message)

Asynchronously renews the lock on the message.

Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)

Starts the auto lock renewal for a ServiceBusReceivedMessage.

Mono<OffsetDateTime> renewSessionLock()

Renews the session lock if this receiver is a session receiver.

Mono<Void> renewSessionLock(Duration maxLockRenewalDuration)

Starts the auto lock renewal for the session this receiver works for.

Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext)

Rollbacks the transaction given and all operations associated with it.

Mono<Void> setSessionState(byte[] sessionState)

Sets the state of the session this receiver works for.

Methods inherited from java.lang.Object

Method Details

abandon

public Mono abandon(ServiceBusReceivedMessage message)

Abandons a ServiceBusReceivedMessage. This will make the message available again for processing. Abandoning a message will increase the delivery count on the message.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.

Returns:

A Mono that completes when the Service Bus abandon operation completes.

abandon

public Mono abandon(ServiceBusReceivedMessage message, AbandonOptions options)

Abandons a ServiceBusReceivedMessage updates the message's properties. This will make the message available again for processing. Abandoning a message will increase the delivery count on the message.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.
options - The options to set while abandoning the message.

Returns:

A Mono that completes when the Service Bus operation finishes.

close

public void close()

Disposes of the consumer by closing the underlying links to the service.

commitTransaction

public Mono commitTransaction(ServiceBusTransactionContext transactionContext)

Commits the transaction and all the operations associated with it.

Creating and using a transaction

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Parameters:

transactionContext - The transaction to be commit.

Returns:

The Mono that finishes this operation on service bus resource.

complete

public Mono complete(ServiceBusReceivedMessage message)

Completes a ServiceBusReceivedMessage. This will delete the message from the service.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.

Returns:

A Mono that finishes when the message is completed on Service Bus.

complete

public Mono complete(ServiceBusReceivedMessage message, CompleteOptions options)

Completes a ServiceBusReceivedMessage with the given options. This will delete the message from the service.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.
options - Options used to complete the message.

Returns:

A Mono that finishes when the message is completed on Service Bus.

createTransaction

public Mono createTransaction()

Starts a new service side transaction. The ServiceBusTransactionContext should be passed to all operations that needs to be in this transaction.

Creating and using a transaction

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Returns:

The Mono that finishes this operation on service bus resource.

deadLetter

public Mono deadLetter(ServiceBusReceivedMessage message)

Moves a ServiceBusReceivedMessage to the dead-letter sub-queue.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.

Returns:

A Mono that completes when the dead letter operation finishes.

deadLetter

public Mono deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

Moves a ServiceBusReceivedMessage to the dead-letter sub-queue with the given options.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.
options - Options used to dead-letter the message.

Returns:

A Mono that completes when the dead letter operation finishes.

defer

public Mono defer(ServiceBusReceivedMessage message)

Defers a ServiceBusReceivedMessage. This will move message into the deferred sub-queue.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.

Returns:

A Mono that completes when the Service Bus defer operation finishes.

defer

public Mono defer(ServiceBusReceivedMessage message, DeferOptions options)

Defers a ServiceBusReceivedMessage with the options set. This will move message into the deferred sub-queue.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.
options - Options used to defer the message.

Returns:

A Mono that completes when the defer operation finishes.

getEntityPath

public String getEntityPath()

Gets the Service Bus resource this client interacts with.

Returns:

The Service Bus resource this client interacts with.

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

Gets the fully qualified Service Bus namespace that the connection is associated with. This is likely similar to {yournamespace}.servicebus.windows.net.

Returns:

The fully qualified Service Bus namespace that the connection is associated with.

getIdentifier

public String getIdentifier()

Gets the identifier of the instance of ServiceBusReceiverAsyncClient.

Returns:

The identifier that can identify the instance of ServiceBusReceiverAsyncClient.

getSessionId

public String getSessionId()

Gets the SessionId of the session if this receiver is a session receiver.

Returns:

The SessionId or null if this is not a session receiver.

getSessionState

public Mono getSessionState()

Gets the state of the session if this receiver is a session receiver.

Returns:

The session state or an empty Mono if there is no state set for the session.

peekMessage

public Mono peekMessage()

Reads the next active message without changing the state of the receiver or the message source. The first call to peek() fetches the first active message for this receiver. Each subsequent call fetches the subsequent message in the entity.

Returns:

peekMessage

public Mono peekMessage(long sequenceNumber)

Starting from the given sequence number, reads next the active message without changing the state of the receiver or the message source.

Parameters:

sequenceNumber - The sequence number from where to read the message.

Returns:

peekMessages

public Flux peekMessages(int maxMessages)

Reads the next batch of active messages without changing the state of the receiver or the message source.

Parameters:

maxMessages - The number of messages.

Returns:

A Flux of ServiceBusReceivedMessage that are peeked.

peekMessages

public Flux peekMessages(int maxMessages, long sequenceNumber)

Starting from the given sequence number, reads the next batch of active messages without changing the state of the receiver or the message source.

Parameters:

maxMessages - The number of messages.
sequenceNumber - The sequence number from where to start reading messages.

Returns:

A Flux of ServiceBusReceivedMessage peeked.

receiveDeferredMessage

public Mono receiveDeferredMessage(long sequenceNumber)

Receives a deferred ServiceBusReceivedMessage. Deferred messages can only be received by using sequence number.

Parameters:

sequenceNumber - The getSequenceNumber() of the message.

Returns:

A deferred message with the matching sequenceNumber.

receiveDeferredMessages

public Flux receiveDeferredMessages(Iterable sequenceNumbers)

Receives a batch of deferred ServiceBusReceivedMessage. Deferred messages can only be received by using sequence number.

Parameters:

sequenceNumbers - The sequence numbers of the deferred messages.

Returns:

A Flux of deferred ServiceBusReceivedMessage.

receiveMessages

public Flux receiveMessages()

Receives an infinite stream of ServiceBusReceivedMessage from the Service Bus entity. This Flux continuously receives messages from a Service Bus entity until either:

  • The receiver is closed.
  • The subscription to the Flux is disposed.
  • A terminal signal from a downstream subscriber is propagated upstream (ie. Flux#take(long) or Flux#take(Duration)).
  • An AmqpException occurs that causes the receive link to stop.

The client uses an AMQP link underneath to receive the messages; the client will transparently transition to a new AMQP link if the current one encounters a retriable error. When the client experiences a non-retriable error or exhausts the retries, the Subscriber's org.reactivestreams.Subscriber#onError(Throwable) terminal handler will be notified with this error. No further messages will be delivered to org.reactivestreams.Subscriber#onNext(Object) after the terminal event; the application must create a new client to resume the receive. Re-subscribing to the Flux of the old client will have no effect.

Note: A few examples of non-retriable errors are - the application attempting to connect to a queue that does not exist, deleting or disabling the queue in the middle of receiving, the user explicitly initiating Geo-DR. These are certain events where the Service Bus communicates to the client that a non-retriable error occurred.

Returns:

An infinite stream of messages from the Service Bus entity.

renewMessageLock

public Mono renewMessageLock(ServiceBusReceivedMessage message)

Asynchronously renews the lock on the message. The lock will be renewed based on the setting specified on the entity. When a message is received in PEEK_LOCK mode, the message is locked on the server for this receiver instance for a duration as specified during the entity creation (LockDuration). If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, the lock is reset to the entity's LockDuration value.

Parameters:

message - The ServiceBusReceivedMessage to perform auto-lock renewal.

Returns:

The new expiration time for the message.

renewMessageLock

public Mono renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)

Starts the auto lock renewal for a ServiceBusReceivedMessage.

Parameters:

message - The ServiceBusReceivedMessage to perform this operation.
maxLockRenewalDuration - Maximum duration to keep renewing the lock token.

Returns:

A Mono that completes when the message renewal operation has completed up until maxLockRenewalDuration.

renewSessionLock

public Mono renewSessionLock()

Renews the session lock if this receiver is a session receiver.

Returns:

The next expiration time for the session lock.

renewSessionLock

public Mono renewSessionLock(Duration maxLockRenewalDuration)

Starts the auto lock renewal for the session this receiver works for.

Parameters:

maxLockRenewalDuration - Maximum duration to keep renewing the session lock.

Returns:

A lock renewal operation for the message.

rollbackTransaction

public Mono rollbackTransaction(ServiceBusTransactionContext transactionContext)

Rollbacks the transaction given and all operations associated with it.

Creating and using a transaction

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Parameters:

transactionContext - The transaction to rollback.

Returns:

The Mono that finishes this operation on service bus resource.

setSessionState

public Mono setSessionState(byte[] sessionState)

Sets the state of the session this receiver works for.

Parameters:

sessionState - State to set on the session.

Returns:

A Mono that completes when the session is set

Applies to