ServiceBusReceiverAsyncClient Class
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusReceiverAsyncClient
- com.
Implements
public final class ServiceBusReceiverAsyncClient
implements AutoCloseable
An asynchronous receiver responsible for receiving ServiceBusReceivedMessage from an Azure Service Bus queue or topic/subscription.
The examples shown in this document use a credential object named DefaultAzureCredential for authentication, which is appropriate for most scenarios, including local development and production environments. Additionally, we recommend using managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation".
Sample: Creating a ServiceBusReceiverAsyncClient
The following code sample demonstrates the creation of the asynchronous client ServiceBusReceiverAsyncClient. The fullyQualifiedNamespace
is the Service Bus namespace's host name. It is listed under the "Essentials" panel after navigating to the Event Hubs Namespace via Azure Portal. The credential used is DefaultAzureCredential
because it combines commonly used credentials in deployment and development and chooses the credential to used based on its running environment. PEEK_LOCK (the default receive mode) and disableAutoComplete() are strongly recommended so users have control over message settlement.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.receiver()
.disableAutoComplete()
.queueName(queueName)
.buildAsyncClient();
// When users are done with the receiver, dispose of the receiver.
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
asyncReceiver.close();
Sample: Receive all messages from Service Bus resource
This returns an infinite stream of messages from Service Bus. The stream ends when the subscription is disposed or other terminal scenarios. See receiveMessages() for more information.
// Keep a reference to `subscription`. When the program is finished receiving messages, call
// subscription.dispose(). This will stop fetching messages from the Service Bus.
// Consider using Flux.usingWhen to scope the creation, usage, and cleanup of the receiver.
Disposable subscription = asyncReceiver.receiveMessages()
.flatMap(message -> {
System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
System.out.printf("Contents of message as string: %s%n", message.getBody());
// Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return asyncReceiver.complete(message);
} else {
return asyncReceiver.abandon(message);
}
})
.subscribe(unused -> {
}, error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
// When program ends, or you're done receiving all messages, dispose of the receiver.
// Clients should be long-lived objects as they
// require resources and time to establish a connection to the service.
asyncReceiver.close();
Sample: Receive messages in RECEIVE_AND_DELETE mode from a Service Bus entity
The following code sample demonstrates the creation of the asynchronous client ServiceBusReceiverAsyncClient using RECEIVE_AND_DELETE. The fullyQualifiedNamespace
is the Service Bus namespace's host name. It is listed under the "Essentials" panel after navigating to the Event Hubs Namespace via Azure Portal. The credential used is DefaultAzureCredential
because it combines commonly used credentials in deployment and development and chooses the credential to used based on its running environment. See RECEIVE_AND_DELETE docs for more information about receiving messages using this mode.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// Keep a reference to `subscription`. When the program is finished receiving messages, call
// subscription.dispose(). This will stop fetching messages from the Service Bus.
Disposable subscription = Flux.usingWhen(
Mono.fromCallable(() -> {
// Setting the receiveMode when creating the receiver enables receive and delete mode. By default,
// peek lock mode is used. In peek lock mode, users are responsible for settling messages.
return new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.receiver()
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.queueName(queueName)
.buildAsyncClient();
}), receiver -> {
return receiver.receiveMessages();
}, receiver -> {
return Mono.fromRunnable(() -> receiver.close());
})
.subscribe(message -> {
// Messages received in RECEIVE_AND_DELETE mode do not have to be settled because they are automatically
// removed from the queue.
System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
System.out.printf("Contents of message as string: %s%n", message.getBody());
},
error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
Sample: Receive messages from a specific session
To fetch messages from a specific session, switch to ServiceBusSessionReceiverClientBuilder and build the session receiver client. Use acceptSession(String sessionId) to create a session-bound ServiceBusReceiverAsyncClient. The sample assumes that Service Bus sessions were enabled at the time of the queue creation.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
// successfully locked.
// `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
// operations complete.
// `Mono.usingWhen` can also be used if the resource closure returns a single item.
Flux<Void> sessionMessages = Flux.usingWhen(
sessionReceiver.acceptSession("<<my-session-id>>"),
receiver -> {
// Receive messages from <<my-session-id>> session.
return receiver.receiveMessages().flatMap(message -> {
System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
message.getBody());
// Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
});
},
receiver -> Mono.fromRunnable(() -> {
// Dispose of resources.
receiver.close();
sessionReceiver.close();
}));
// When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
// is non-blocking and kicks off the operation.
Disposable subscription = sessionMessages.subscribe(
unused -> {
}, error -> System.err.print("Error receiving message from session: " + error),
() -> System.out.println("Completed receiving from session."));
Sample: Receive messages from the first available session
To process messages from the first available session, switch to ServiceBusSessionReceiverClientBuilder and build the session receiver client. Use acceptNextSession() to find the first available session to process messages from.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// Creates a client to receive messages from the first available session. It waits until
// AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
// completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();
Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
receiver -> receiver.receiveMessages().flatMap(message -> {
System.out.println("Received message: " + message.getBody());
// Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
}),
receiver -> Mono.fromRunnable(() -> {
// Dispose of the receiver and sessionReceiver when done receiving messages.
receiver.close();
sessionReceiver.close();
}));
// This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
// operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
// receiving messages.
Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
}, error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
Sample: Rate limiting consumption of messages from a Service Bus entity
For message receivers that need to limit the number of messages they receive at a given time, they can use BaseSubscriber#request(long).
// This is a non-blocking call. The program will move to the next line of code after setting up the operation.
asyncReceiver.receiveMessages().subscribe(new BaseSubscriber<ServiceBusReceivedMessage>() {
private static final int NUMBER_OF_MESSAGES = 5;
private final AtomicInteger currentNumberOfMessages = new AtomicInteger();
@Override
protected void hookOnSubscribe(Subscription subscription) {
// Tell the Publisher we only want 5 message at a time.
request(NUMBER_OF_MESSAGES);
}
@Override
protected void hookOnNext(ServiceBusReceivedMessage message) {
// Process the ServiceBusReceivedMessage
// If the number of messages we have currently received is a multiple of 5, that means we have reached
// the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher
// that the subscriber is ready to get more messages from upstream.
if (currentNumberOfMessages.incrementAndGet() % 5 == 0) {
request(NUMBER_OF_MESSAGES);
}
}
});
Method Summary
Methods inherited from java.lang.Object
Method Details
abandon
public Mono
Abandons a ServiceBusReceivedMessage. This will make the message available again for processing. Abandoning a message will increase the delivery count on the message.
Parameters:
Returns:
abandon
public Mono
Abandons a ServiceBusReceivedMessage updates the message's properties. This will make the message available again for processing. Abandoning a message will increase the delivery count on the message.
Parameters:
Returns:
close
public void close()
Disposes of the consumer by closing the underlying links to the service.
commitTransaction
public Mono
Commits the transaction and all the operations associated with it.
Creating and using a transaction
// This mono creates a transaction and caches the output value, so we can associate operations with the
// transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
// the operation.
Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
.cache(value -> Duration.ofMillis(Long.MAX_VALUE),
error -> Duration.ZERO,
() -> Duration.ZERO);
// Dispose of the disposable to cancel the operation.
Disposable disposable = transactionContext.flatMap(transaction -> {
// Process messages and associate operations with the transaction.
Mono<Void> operations = Mono.when(
asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
// Finally, either commit or rollback the transaction once all the operations are associated with it.
return operations.then(asyncReceiver.commitTransaction(transaction));
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred processing transaction: " + error);
}, () -> {
System.out.println("Completed transaction");
});
Parameters:
Returns:
complete
public Mono
Completes a ServiceBusReceivedMessage. This will delete the message from the service.
Parameters:
Returns:
complete
public Mono
Completes a ServiceBusReceivedMessage with the given options. This will delete the message from the service.
Parameters:
Returns:
createTransaction
public Mono
Starts a new service side transaction. The ServiceBusTransactionContext should be passed to all operations that needs to be in this transaction.
Creating and using a transaction
// This mono creates a transaction and caches the output value, so we can associate operations with the
// transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
// the operation.
Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
.cache(value -> Duration.ofMillis(Long.MAX_VALUE),
error -> Duration.ZERO,
() -> Duration.ZERO);
// Dispose of the disposable to cancel the operation.
Disposable disposable = transactionContext.flatMap(transaction -> {
// Process messages and associate operations with the transaction.
Mono<Void> operations = Mono.when(
asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
// Finally, either commit or rollback the transaction once all the operations are associated with it.
return operations.then(asyncReceiver.commitTransaction(transaction));
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred processing transaction: " + error);
}, () -> {
System.out.println("Completed transaction");
});
Returns:
deadLetter
public Mono
Moves a ServiceBusReceivedMessage to the dead-letter sub-queue.
Parameters:
Returns:
deadLetter
public Mono
Moves a ServiceBusReceivedMessage to the dead-letter sub-queue with the given options.
Parameters:
Returns:
defer
public Mono
Defers a ServiceBusReceivedMessage. This will move message into the deferred sub-queue.
Parameters:
Returns:
defer
public Mono
Defers a ServiceBusReceivedMessage with the options set. This will move message into the deferred sub-queue.
Parameters:
Returns:
getEntityPath
public String getEntityPath()
Gets the Service Bus resource this client interacts with.
Returns:
getFullyQualifiedNamespace
public String getFullyQualifiedNamespace()
Gets the fully qualified Service Bus namespace that the connection is associated with. This is likely similar to {yournamespace}.servicebus.windows.net
.
Returns:
getIdentifier
public String getIdentifier()
Gets the identifier of the instance of ServiceBusReceiverAsyncClient.
Returns:
getSessionId
public String getSessionId()
Gets the SessionId of the session if this receiver is a session receiver.
Returns:
getSessionState
public Mono
Gets the state of the session if this receiver is a session receiver.
Returns:
peekMessage
public Mono
Reads the next active message without changing the state of the receiver or the message source. The first call to peek()
fetches the first active message for this receiver. Each subsequent call fetches the subsequent message in the entity.
Returns:
peekMessage
public Mono
Starting from the given sequence number, reads next the active message without changing the state of the receiver or the message source.
Parameters:
Returns:
peekMessages
public Flux
Reads the next batch of active messages without changing the state of the receiver or the message source.
Parameters:
Returns:
peekMessages
public Flux
Starting from the given sequence number, reads the next batch of active messages without changing the state of the receiver or the message source.
Parameters:
Returns:
receiveDeferredMessage
public Mono
Receives a deferred ServiceBusReceivedMessage. Deferred messages can only be received by using sequence number.
Parameters:
Returns:
sequenceNumber
.receiveDeferredMessages
public Flux
Receives a batch of deferred ServiceBusReceivedMessage. Deferred messages can only be received by using sequence number.
Parameters:
Returns:
receiveMessages
public Flux
Receives an infinite stream of ServiceBusReceivedMessage from the Service Bus entity. This Flux continuously receives messages from a Service Bus entity until either:
- The receiver is closed.
- The subscription to the Flux is disposed.
- A terminal signal from a downstream subscriber is propagated upstream (ie. Flux#take(long) or Flux#take(Duration)).
- An AmqpException occurs that causes the receive link to stop.
The client uses an AMQP link underneath to receive the messages; the client will transparently transition to a new AMQP link if the current one encounters a retriable error. When the client experiences a non-retriable error or exhausts the retries, the Subscriber's org.reactivestreams.Subscriber#onError(Throwable) terminal handler will be notified with this error. No further messages will be delivered to org.reactivestreams.Subscriber#onNext(Object) after the terminal event; the application must create a new client to resume the receive. Re-subscribing to the Flux of the old client will have no effect.
Note: A few examples of non-retriable errors are - the application attempting to connect to a queue that does not exist, deleting or disabling the queue in the middle of receiving, the user explicitly initiating Geo-DR. These are certain events where the Service Bus communicates to the client that a non-retriable error occurred.
Returns:
renewMessageLock
public Mono
Asynchronously renews the lock on the message. The lock will be renewed based on the setting specified on the entity. When a message is received in PEEK_LOCK mode, the message is locked on the server for this receiver instance for a duration as specified during the entity creation (LockDuration). If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, the lock is reset to the entity's LockDuration value.
Parameters:
Returns:
renewMessageLock
public Mono
Starts the auto lock renewal for a ServiceBusReceivedMessage.
Parameters:
Returns:
maxLockRenewalDuration
.renewSessionLock
public Mono
Renews the session lock if this receiver is a session receiver.
Returns:
renewSessionLock
public Mono
Starts the auto lock renewal for the session this receiver works for.
Parameters:
Returns:
rollbackTransaction
public Mono
Rollbacks the transaction given and all operations associated with it.
Creating and using a transaction
// This mono creates a transaction and caches the output value, so we can associate operations with the
// transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
// the operation.
Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
.cache(value -> Duration.ofMillis(Long.MAX_VALUE),
error -> Duration.ZERO,
() -> Duration.ZERO);
// Dispose of the disposable to cancel the operation.
Disposable disposable = transactionContext.flatMap(transaction -> {
// Process messages and associate operations with the transaction.
Mono<Void> operations = Mono.when(
asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
// Finally, either commit or rollback the transaction once all the operations are associated with it.
return operations.then(asyncReceiver.commitTransaction(transaction));
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred processing transaction: " + error);
}, () -> {
System.out.println("Completed transaction");
});
Parameters:
Returns:
setSessionState
public Mono
Sets the state of the session this receiver works for.
Parameters:
Returns: