ServiceBusReceiverClient Class
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusReceiverClient
- com.
Implements
public final class ServiceBusReceiverClient
implements AutoCloseable
A synchronous receiver responsible for receiving ServiceBusReceivedMessage from a queue or topic/subscription on Azure Service Bus.
The examples shown in this document use a credential object named DefaultAzureCredential for authentication, which is appropriate for most scenarios, including local development and production environments. Additionally, we recommend using managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation".
Sample: Create a receiver and receive messages
The following code sample demonstrates the creation and use of the synchronous client ServiceBusReceiverClient to receive messages from a Service Bus subscription. The receive operation returns when either 10 messages are received or 30 seconds has elapsed. By default, messages are received using PEEK_LOCK and customers must settle their messages using one of the settlement methods on the receiver client. " "Settling receive operations" provides additional information about message settlement.
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.receiver()
.topicName(topicName)
.subscriptionName(subscriptionName)
.buildClient();
// Receives a batch of messages when 10 messages are received or until 30 seconds have elapsed, whichever
// happens first.
IterableStream<ServiceBusReceivedMessage> messages = receiver.receiveMessages(10, Duration.ofSeconds(30));
messages.forEach(message -> {
System.out.printf("Id: %s. Contents: %s%n", message.getMessageId(), message.getBody());
// If able to process message, complete it. Otherwise, abandon it and allow it to be
// redelivered.
if (isMessageProcessed) {
receiver.complete(message);
} else {
receiver.abandon(message);
}
});
// When program ends, or you're done receiving all messages, dispose of the receiver.
// Clients should be long-lived objects as they
// require resources and time to establish a connection to the service.
receiver.close();
Method Summary
Modifier and Type | Method and Description |
---|---|
void |
abandon(ServiceBusReceivedMessage message)
Abandons a ServiceBusReceivedMessage. |
void |
abandon(ServiceBusReceivedMessage message, AbandonOptions options)
Abandons a ServiceBusReceivedMessage and updates the message's properties. |
void |
close()
Disposes of the consumer by closing the underlying links to the service. |
void |
commitTransaction(ServiceBusTransactionContext transactionContext)
Commits the transaction and all the operations associated with it. |
void |
complete(ServiceBusReceivedMessage message)
Completes a ServiceBusReceivedMessage. |
void |
complete(ServiceBusReceivedMessage message, CompleteOptions options)
Completes a ServiceBusReceivedMessage. |
Service |
createTransaction()
Starts a new transaction on Service Bus. |
void |
deadLetter(ServiceBusReceivedMessage message)
Moves a ServiceBusReceivedMessage to the dead-letter sub-queue. |
void |
deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)
Moves a ServiceBusReceivedMessage to the dead-letter sub-queue with dead-letter reason, error description, and/or modified properties. |
void |
defer(ServiceBusReceivedMessage message)
Defers a ServiceBusReceivedMessage. |
void |
defer(ServiceBusReceivedMessage message, DeferOptions options)
Defers a ServiceBusReceivedMessage using its lock token with modified message property. |
String |
getEntityPath()
Gets the Service Bus resource this client interacts with. |
String |
getFullyQualifiedNamespace()
Gets the fully qualified Service Bus namespace that the connection is associated with. |
String |
getIdentifier()
Gets the identifier of the instance of ServiceBusReceiverClient. |
String |
getSessionId()
Gets the Session |
byte[] |
getSessionState()
Gets the state of the session if this receiver is a session receiver. |
Service |
peekMessage()
Reads the next active message without changing the state of the receiver or the message source. |
Service |
peekMessage(long sequenceNumber)
Starting from the given sequence number, reads next the active message without changing the state of the receiver or the message source. |
Iterable |
peekMessages(int maxMessages)
Reads the next batch of active messages without changing the state of the receiver or the message source. |
Iterable |
peekMessages(int maxMessages, long sequenceNumber)
Starting from the given sequence number, reads the next batch of active messages without changing the state of the receiver or the message source. |
Service |
receiveDeferredMessage(long sequenceNumber)
Receives a deferred ServiceBusReceivedMessage. |
Iterable |
receiveDeferredMessageBatch(Iterable<Long> sequenceNumbers)
Receives a batch of deferred ServiceBusReceivedMessage. |
Iterable |
receiveMessages(int maxMessages)
Receives an iterable stream of ServiceBusReceivedMessage from the Service Bus entity. |
Iterable |
receiveMessages(int maxMessages, Duration maxWaitTime)
Receives an iterable stream of ServiceBusReceivedMessage from the Service Bus entity with a timout. |
Offset |
renewMessageLock(ServiceBusReceivedMessage message)
Renews the lock on the specified message. |
void |
renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration, Consumer<Throwable> onError)
Starts the auto lock renewal for a message with the given lock. |
Offset |
renewSessionLock()
Sets the state of the session if this receiver is a session receiver. |
void |
renewSessionLock(Duration maxLockRenewalDuration, Consumer<Throwable> onError)
Starts the auto lock renewal for the session that this receiver works for. |
void |
rollbackTransaction(ServiceBusTransactionContext transactionContext)
Rollbacks the transaction given and all operations associated with it. |
void |
setSessionState(byte[] sessionState)
Sets the state of the session if this receiver is a session receiver. |
Methods inherited from java.lang.Object
Method Details
abandon
public void abandon(ServiceBusReceivedMessage message)
Abandons a ServiceBusReceivedMessage. This will make the message available again for processing. Abandoning a message will increase the delivery count on the message.
Parameters:
abandon
public void abandon(ServiceBusReceivedMessage message, AbandonOptions options)
Abandons a ServiceBusReceivedMessage and updates the message's properties. This will make the message available again for processing. Abandoning a message will increase the delivery count on the message.
Parameters:
close
public void close()
Disposes of the consumer by closing the underlying links to the service.
commitTransaction
public void commitTransaction(ServiceBusTransactionContext transactionContext)
Commits the transaction and all the operations associated with it.
Creating and using a transaction
ServiceBusTransactionContext transaction = receiver.createTransaction();
// Process messages and associate operations with the transaction.
ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
receiver.commitTransaction(transaction);
Parameters:
complete
public void complete(ServiceBusReceivedMessage message)
Completes a ServiceBusReceivedMessage. This will delete the message from the service.
Parameters:
complete
public void complete(ServiceBusReceivedMessage message, CompleteOptions options)
Completes a ServiceBusReceivedMessage. This will delete the message from the service.
Parameters:
createTransaction
public ServiceBusTransactionContext createTransaction()
Starts a new transaction on Service Bus. The ServiceBusTransactionContext should be passed along to all operations that need to be in this transaction.
Sample: Creating and using a transaction
ServiceBusTransactionContext transaction = receiver.createTransaction();
// Process messages and associate operations with the transaction.
ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
receiver.commitTransaction(transaction);
Returns:
deadLetter
public void deadLetter(ServiceBusReceivedMessage message)
Moves a ServiceBusReceivedMessage to the dead-letter sub-queue.
Parameters:
deadLetter
public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)
Moves a ServiceBusReceivedMessage to the dead-letter sub-queue with dead-letter reason, error description, and/or modified properties.
Parameters:
defer
public void defer(ServiceBusReceivedMessage message)
Defers a ServiceBusReceivedMessage. This will move message into the deferred subqueue.
Parameters:
defer
public void defer(ServiceBusReceivedMessage message, DeferOptions options)
Defers a ServiceBusReceivedMessage using its lock token with modified message property. This will move message into the deferred sub-queue.
Parameters:
getEntityPath
public String getEntityPath()
Gets the Service Bus resource this client interacts with.
Returns:
getFullyQualifiedNamespace
public String getFullyQualifiedNamespace()
Gets the fully qualified Service Bus namespace that the connection is associated with. This is likely similar to {yournamespace}.servicebus.windows.net
.
Returns:
getIdentifier
public String getIdentifier()
Gets the identifier of the instance of ServiceBusReceiverClient.
Returns:
getSessionId
public String getSessionId()
Gets the SessionId of the session if this receiver is a session receiver.
Returns:
getSessionState
public byte[] getSessionState()
Gets the state of the session if this receiver is a session receiver.
Returns:
peekMessage
public ServiceBusReceivedMessage peekMessage()
Reads the next active message without changing the state of the receiver or the message source. The first call to peekMessage()
fetches the first active message for this receiver. Each subsequent call fetches the subsequent message in the entity.
Returns:
peekMessage
public ServiceBusReceivedMessage peekMessage(long sequenceNumber)
Starting from the given sequence number, reads next the active message without changing the state of the receiver or the message source.
Parameters:
Returns:
peekMessages
public IterableStream
Reads the next batch of active messages without changing the state of the receiver or the message source.
Parameters:
Returns:
peekMessages
public IterableStream
Starting from the given sequence number, reads the next batch of active messages without changing the state of the receiver or the message source.
Parameters:
Returns:
receiveDeferredMessage
public ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber)
Receives a deferred ServiceBusReceivedMessage. Deferred messages can only be received by using sequence number.
Parameters:
Returns:
sequenceNumber
.receiveDeferredMessageBatch
public IterableStream
Receives a batch of deferred ServiceBusReceivedMessage. Deferred messages can only be received by using sequence number.
Parameters:
Returns:
receiveMessages
public IterableStream
Receives an iterable stream of ServiceBusReceivedMessage from the Service Bus entity. The receive operation will wait for a default 1 minute for receiving a message before it times out. You can override it by using receiveMessages(int maxMessages, Duration maxWaitTime).
The 1-minute timeout is a client-side feature. Each time the application calls receiveMessages
, a timer is started on the client that when expires will terminate the IterableStream returned from this method. Timeout being a client-side feature means it is impossible to cancel any message requests that already made it to the broker. The messages can still arrive in the background after the IterableStream is transitioned to terminated state due to the client-side timeout. If there is no active IterableStream, the client will attempt to release any buffered messages back to the broker to avoid messages from going to dead letter. While messages are being released, if a new active IterableStream appears (due to a new receiveMessages
call) then client will stop further release, so application may receive some messages from the buffer or already in transit followed by previously released messages when broker redeliver them, which can appear as out of order delivery.
To keep the lock on each message received from a non-session resource (queue, topic subscription), the client will run a background task that will continuously renew the lock before it expires. By default, the lock renew task will run for a duration of 5 minutes, this duration can be adjusted using the ServiceBusReceiverClientBuilder#maxAutoLockRenewDuration(Duration) API or can be turned off by setting it Duration#ZERO. A higher maxMessages
value means an equivalent number of lock renewal tasks running in the client, which may put more stress on low CPU environments. Given each lock renewal is a network call to the broker, a high number of lock renewal tasks making multiple lock renew calls also may have an adverse effect in namespace throttling. Additionally, if certain lock renewal tasks fail to renew the lock on time because of low CPU, service throttling or overloaded network, then client may lose the lock on the messages, which will cause the application's attempts to settle (e.g., complete, abandon) those messages to fail. The broker will redeliver those messages, but if the settling attempts fail repeatedly beyond the max delivery count, then the message will be transferred to dead letter queue. Keep this in mind when choosing maxMessages
. You may consider disabling the client-side lock renewal using maxAutoLockRenewDuration(Duration.ZERO)
if you can configure a lock duration at the resource (queue,topic subscription) level that at least exceeds the cumulative expected processing time for maxMessages
messages.
The client uses an AMQP link underneath to receive the messages; the client will transparently transition to a new AMQP link if the current one encounters a retriable error. When the client experiences a non-retriable error or exhausts the retries, the iteration (e.g., forEach) on the IterableStream<T> returned by the further invocations of receiveMessages API will throw the error to the application. Once the application receives this error, the application should reset the client, i.e., close the current ServiceBusReceiverClient and create a new client to continue receiving messages.
Note: A few examples of non-retriable errors are - the application attempting to connect to a queue that does not exist, deleting or disabling the queue in the middle of receiving, the user explicitly initiating Geo-DR. These are certain events where the Service Bus communicates to the client that a non-retriable error occurred.
Parameters:
Returns:
maxMessages
messages from the Service Bus entity.receiveMessages
public IterableStream
Receives an iterable stream of ServiceBusReceivedMessage from the Service Bus entity with a timout. The default receive mode is PEEK_LOCK unless it is changed during creation of ServiceBusReceiverClient using ServiceBusReceiverClientBuilder#receiveMode(ServiceBusReceiveMode).
The support for timeout maxWaitTime
is a client-side feature. Each time the application calls receiveMessages
, a timer is started on the client that when expires will terminate the IterableStream returned from this method. Timeout being a client-side feature means it is impossible to cancel any message requests that already made it to the broker. The messages can still arrive in the background after the IterableStream is transitioned to terminated state due to the client-side timeout. If there is no active IterableStream, the client will attempt to release any buffered messages back to the broker to avoid messages from going to dead letter. While messages are being released, if a new active IterableStream appears (due to a new receiveMessages
call) then client will stop further release, so application may receive some messages from the buffer or already in transit followed by previously released messages when broker redeliver them, which can appear as out of order delivery. Consider these when choosing the timeout. For example, a small timeout with a higher maxMessages
value while there are a lot of messages in the entity can increase the release network calls to the broker that might have adverse effect in namespace throttling and increases the chances of out of order deliveries. Also, frequent receiveMessages with low timeout means frequent scheduling of timer tasks, which may put more stress on low CPU environments.
To keep the lock on each message received from a non-session resource (queue, topic subscription), the client will run a background task that will continuously renew the lock before it expires. By default, the lock renew task will run for a duration of 5 minutes, this duration can be adjusted using the ServiceBusReceiverClientBuilder#maxAutoLockRenewDuration(Duration) API or can be turned off by setting it Duration#ZERO. A higher maxMessages
value means an equivalent number of lock renewal tasks running in the client, which may put more stress on low CPU environments. Given each lock renewal is a network call to the broker, a high number of lock renewal tasks making multiple lock renew calls also may have an adverse effect in namespace throttling. Additionally, if certain lock renewal tasks fail to renew the lock on time because of low CPU, service throttling or overloaded network, then client may lose the lock on the messages, which will cause the application's attempts to settle (e.g., complete, abandon) those messages to fail. The broker will redeliver those messages, but if the settling attempts fail repeatedly beyond the max delivery count, then the message will be transferred to dead letter queue. Keep this in mind when choosing maxMessages
. You may consider disabling the client-side lock renewal using maxAutoLockRenewDuration(Duration.ZERO)
if you can configure a lock duration at the resource (queue,topic subscription) level that at least exceeds the cumulative expected processing time for maxMessages
messages.
The client uses an AMQP link underneath to receive the messages; the client will transparently transition to a new AMQP link if the current one encounters a retriable error. When the client experiences a non-retriable error or exhausts the retries, the iteration (e.g., forEach) on the IterableStream<T> returned by the further invocations of receiveMessages API will throw the error to the application. Once the application receives this error, the application should reset the client, i.e., close the current ServiceBusReceiverClient and create a new client to continue receiving messages.
Note: A few examples of non-retriable errors are - the application attempting to connect to a queue that does not exist, deleting or disabling the queue in the middle of receiving, the user explicitly initiating Geo-DR. These are certain events where the Service Bus communicates to the client that a non-retriable error occurred.
Parameters:
Returns:
maxMessages
messages from the Service Bus entity.renewMessageLock
public OffsetDateTime renewMessageLock(ServiceBusReceivedMessage message)
Renews the lock on the specified message. The lock will be renewed based on the setting specified on the entity. When a message is received in PEEK_LOCK mode, the message is locked on the server for this receiver instance for a duration as specified during the Queue creation (LockDuration). If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, the lock is reset to the entity's LockDuration value.
Parameters:
Returns:
renewMessageLock
public void renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration, Consumer
Starts the auto lock renewal for a message with the given lock.
Parameters:
renewSessionLock
public OffsetDateTime renewSessionLock()
Sets the state of the session if this receiver is a session receiver.
Returns:
renewSessionLock
public void renewSessionLock(Duration maxLockRenewalDuration, Consumer
Starts the auto lock renewal for the session that this receiver works for.
Parameters:
rollbackTransaction
public void rollbackTransaction(ServiceBusTransactionContext transactionContext)
Rollbacks the transaction given and all operations associated with it.
Creating and using a transaction
ServiceBusTransactionContext transaction = receiver.createTransaction();
// Process messages and associate operations with the transaction.
ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
receiver.commitTransaction(transaction);
Parameters:
setSessionState
public void setSessionState(byte[] sessionState)
Sets the state of the session if this receiver is a session receiver.
Parameters: