ServiceBusClientBuilder Class
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusClientBuilder
- com.
Implements
public final class ServiceBusClientBuilder
implements TokenCredentialTrait<ServiceBusClientBuilder>, AzureNamedKeyCredentialTrait<ServiceBusClientBuilder>, ConnectionStringTrait<ServiceBusClientBuilder>, AzureSasCredentialTrait<ServiceBusClientBuilder>, AmqpTrait<ServiceBusClientBuilder>, ConfigurationTrait<ServiceBusClientBuilder>
This class provides a fluent builder API to aid the instantiation of clients to send and receive messages to/from Service Bus entities.
Credentials are required to perform operations against Azure Service Bus. They can be set by using one of the following methods:
- connectionString(String connectionString) with a connection string to the Service Bus namespace.
- credential(String fullyQualifiedNamespace, TokenCredential credential), credential(String fullyQualifiedNamespace, AzureSasCredential credential), and credential(String fullyQualifiedNamespace, AzureNamedKeyCredential credential) overloads can be used with the respective credentials that has access to the fully-qualified Service Bus namespace.
- credential(TokenCredential credential), credential(AzureSasCredential credential), and credential(AzureNamedKeyCredential credential) overloads can be used with its respective credentials. fullyQualifiedNamespace(String fullyQualifiedNamespace) must be set.
The credential used in the following samples is DefaultAzureCredential
for authentication. It is appropriate for most scenarios, including local development and production environments. Additionally, we recommend using managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation".
Clients and sub-builders
ServiceBusClientBuilder can instantiate several clients. The client to instantiate depends on whether users are publishing or receiving messages and if the entity has Service Bus sessions enabled.
- Sending messages: Use the sender() sub-builder to create ServiceBusSenderAsyncClient and ServiceBusSenderClient.
- Receiving messages: Use the receiver() sub-builder to create ServiceBusReceiverAsyncClient and ServiceBusReceiverAsyncClient.
- Receiving messages from a session-enabled Service Bus entity: Use the sessionReceiver() sub-builder to create ServiceBusSessionReceiverAsyncClient and ServiceBusSessionReceiverClient.
- Receiving messages using a callback-based processor: Use the processor() sub-builder to create ServiceBusProcessorClient.
- Receiving messages from a session-enabled Service Bus entity using a callback-based processor : Use the sessionProcessor() sub-builder to create ServiceBusProcessorClient.
Sending messages
Sample: Instantiate a synchronous sender and send a message
The following code sample demonstrates the creation of the synchronous client ServiceBusSenderClient and sending a message. The fullyQualifiedNamespace
is the Service Bus namespace's host name. It is listed under the "Essentials" panel after navigating to the Service Bus namespace via Azure Portal. The credential used is DefaultAzureCredential
because it combines commonly used credentials in deployment and development and chooses the credential to used based on its running environment. When performance is important, consider using ServiceBusMessageBatch to publish multiple messages at once.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sender()
.queueName(queueName)
.buildClient();
sender.sendMessage(new ServiceBusMessage("Foo bar"));
Consuming messages
There are multiple clients for consuming messages from a Service Bus entity (that is not have Service Bus sessions enabled).
Sample: Instantiate an asynchronous receiver
The code example below demonstrates creating an async receiver. The credential used is DefaultAzureCredential
for authentication. It is appropriate for most scenarios, including local development and production environments. PEEK_LOCK and disableAutoComplete() are strongly recommended so users have control over message settlement.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.receiver()
.disableAutoComplete()
.queueName(queueName)
.buildAsyncClient();
// When users are done with the receiver, dispose of the receiver.
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
asyncReceiver.close();
Sample: Instantiate ServiceBusProcessorClient
The code example below demonstrates creating a processor client. The processor client is recommended for most production scenarios because it offers connection recovery. The credential used is DefaultAzureCredential
for authentication. It is appropriate for most scenarios, including local development and production environments. PEEK_LOCK and disableAutoComplete() are strongly recommended so users have control over message settlement.
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
// Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
// handling message reaches desired state such that it doesn't require Service Bus to redeliver
// the same message, then context.complete() should be called otherwise context.abandon().
final boolean success = Math.random() < 0.5;
if (success) {
try {
context.complete();
} catch (RuntimeException error) {
System.out.printf("Completion of the message %s failed.%n Error: %s%n",
message.getMessageId(), error);
}
} else {
try {
context.abandon();
} catch (RuntimeException error) {
System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
message.getMessageId(), error);
}
}
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
Consuming messages from a session-enabled Service Bus entity
Service Bus supports joint and ordered handling of unbounded sequences of messages through Service Bus sessions. Sessions can be used as a first in, first out (FIFO) processing of messages. Queues and topics/subscriptions support Service Bus sessions, however, it must be enabled at the time of entity creation.
Sample: Sending a message to a session-enabled queue
The snippet below demonstrates sending a message to a Service Bus sessions enabled queue. Setting setMessageId(String messageId) property to "greetings" will send the message to a Service Bus session with an id of "greetings".
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
.sender()
.queueName(sessionEnabledQueueName)
.buildClient();
// Setting sessionId publishes that message to a specific session, in this case, "greeting".
ServiceBusMessage message = new ServiceBusMessage("Hello world")
.setSessionId("greetings");
sender.sendMessage(message);
// Dispose of the sender.
sender.close();
Sample: Receive messages from first available session
To process messages from the first available session, switch to ServiceBusSessionReceiverClientBuilder and build the session receiver client. Use acceptNextSession() to find the first available session to process messages from.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// Creates a client to receive messages from the first available session. It waits until
// AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
// completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();
Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
receiver -> receiver.receiveMessages().flatMap(message -> {
System.out.println("Received message: " + message.getBody());
// Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
}),
receiver -> Mono.fromRunnable(() -> {
// Dispose of the receiver and sessionReceiver when done receiving messages.
receiver.close();
sessionReceiver.close();
}));
// This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
// operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
// receiving messages.
Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
}, error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
Sample: Process messages from all sessions
The following code sample demonstrates the creation the ServiceBusProcessorClient that processes all available sessions in the queue. ServiceBusSessionProcessorClientBuilder#maxConcurrentSessions(int) indicates how many sessions the processor will process at the same time. The credential used is DefaultAzureCredential
for authentication. It is appropriate for most scenarios, including local development and production environments. PEEK_LOCK and disableAutoComplete() are strongly recommended so users have control over message settlement.
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
Consumer<ServiceBusErrorContext> onError = context -> {
System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
context.getFullyQualifiedNamespace(), context.getEntityPath());
if (context.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) context.getException();
System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", context.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.sessionProcessor()
.queueName(sessionEnabledQueueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete()
.maxConcurrentSessions(2)
.processMessage(onMessage)
.processError(onError)
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
sessionProcessor.start();
// Stop processor and dispose when done processing messages.
sessionProcessor.stop();
sessionProcessor.close();
Connection sharing
The creation of a connection to Service Bus requires resources. If your architecture allows, an application should share connection between clients which can be achieved by sharing the top level builder as shown below.
Sharing a connection between clients
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// Any clients created from this builder will share the underlying connection.
ServiceBusClientBuilder sharedConnectionBuilder = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential);
// Create receiver and sender which will share the connection.
ServiceBusReceiverClient receiver = sharedConnectionBuilder
.receiver()
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.queueName(queueName)
.buildClient();
ServiceBusSenderClient sender = sharedConnectionBuilder
.sender()
.queueName(queueName)
.buildClient();
// Use the clients and finally close them.
try {
sender.sendMessage(new ServiceBusMessage("payload"));
receiver.receiveMessages(1);
} finally {
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
sender.close();
receiver.close();
}
Constructor Summary
Constructor | Description |
---|---|
ServiceBusClientBuilder() |
Creates a new instance with the default transport AMQP. |
Method Summary
Modifier and Type | Method and Description |
---|---|
Service |
clientOptions(ClientOptions clientOptions)
Sets the ClientOptions to be sent from the client built from this builder, enabling customization of certain properties, as well as support the addition of custom header information. |
Service |
configuration(Configuration configuration)
Sets the configuration store that is used during construction of the service client. |
Service |
connectionString(String connectionString)
Sets the connection string for a Service Bus namespace or a specific Service Bus resource. |
Service |
credential(AzureNamedKeyCredential credential)
Sets the credential with the shared access policies for the Service Bus resource. |
Service |
credential(AzureSasCredential credential)
Sets the credential with Shared Access Signature for the Service Bus resource. |
Service |
credential(TokenCredential credential)
Sets the TokenCredential used to authorize requests sent to the service. |
Service |
credential(String fullyQualifiedNamespace, AzureNamedKeyCredential credential)
Sets the credential with the shared access policies for the Service Bus resource. |
Service |
credential(String fullyQualifiedNamespace, AzureSasCredential credential)
Sets the credential with Shared Access Signature for the Service Bus resource. |
Service |
credential(String fullyQualifiedNamespace, TokenCredential credential)
Sets the credential by using a TokenCredential for the Service Bus resource. |
Service |
customEndpointAddress(String customEndpointAddress)
Sets a custom endpoint address when connecting to the Service Bus service. |
Service |
enableCrossEntityTransactions()
Enable cross entity transaction on the connection to Service bus. |
Service |
fullyQualifiedNamespace(String fullyQualifiedNamespace)
Sets the fully-qualified namespace for the Service Bus. |
Service |
processor()
A new instance of ServiceBusProcessorClientBuilder used to configure ServiceBusProcessorClient instance. |
Service |
proxyOptions(ProxyOptions proxyOptions)
Sets the proxy configuration to use for ServiceBusSenderAsyncClient. |
Service |
receiver()
A new instance of ServiceBusReceiverClientBuilder used to configure Service Bus message receivers. |
Service |
retryOptions(AmqpRetryOptions retryOptions)
Sets the retry options for Service Bus clients. |
Service |
ruleManager()
A new instance of ServiceBusRuleManagerBuilder used to configure a Service Bus rule manager instance. |
Service |
sender()
A new instance of ServiceBusSenderClientBuilder used to configure Service Bus message senders. |
Service |
sessionProcessor()
A new instance of ServiceBusSessionProcessorClientBuilder used to configure a Service Bus processor instance that processes sessions. |
Service |
sessionReceiver()
A new instance of ServiceBusSessionReceiverClientBuilder used to configure session aware Service Bus message receivers. |
Service |
transportType(AmqpTransportType transportType)
Sets the transport type by which all the communication with Azure Service Bus occurs. |
Methods inherited from java.lang.Object
Constructor Details
ServiceBusClientBuilder
public ServiceBusClientBuilder()
Creates a new instance with the default transport AMQP.
Method Details
clientOptions
public ServiceBusClientBuilder clientOptions(ClientOptions clientOptions)
Sets the ClientOptions to be sent from the client built from this builder, enabling customization of certain properties, as well as support the addition of custom header information. Refer to the ClientOptions documentation for more information.
Parameters:
Returns:
configuration
public ServiceBusClientBuilder configuration(Configuration configuration)
Sets the configuration store that is used during construction of the service client. If not specified, the default configuration store is used to configure Service Bus clients. Use NONE to bypass using configuration settings during construction.
Parameters:
Returns:
connectionString
public ServiceBusClientBuilder connectionString(String connectionString)
Sets the connection string for a Service Bus namespace or a specific Service Bus resource.
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(AzureNamedKeyCredential credential)
Sets the credential with the shared access policies for the Service Bus resource. You can find the shared access policies on the azure portal or Azure CLI. For instance, on the portal, "Shared Access policies" has 'policy' and its 'Primary Key' and 'Secondary Key'. The 'name' attribute of the AzureNamedKeyCredential is the 'policy' on portal and the 'key' attribute can be either 'Primary Key' or 'Secondary Key'. This method and connectionString(String connectionString) take the same information in different forms. But it allows you to update the name and key.
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(AzureSasCredential credential)
Sets the credential with Shared Access Signature for the Service Bus resource. Refer to Service Bus access control with Shared Access Signatures.
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(TokenCredential credential)
Sets the TokenCredential used to authorize requests sent to the service. Refer to the Azure SDK for Java identity and authentication documentation for more details on proper usage of the TokenCredential type.
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, AzureNamedKeyCredential credential)
Sets the credential with the shared access policies for the Service Bus resource. You can find the shared access policies on the azure portal or Azure CLI. For instance, on the portal, "Shared Access policies" has 'policy' and its 'Primary Key' and 'Secondary Key'. The 'name' attribute of the AzureNamedKeyCredential is the 'policy' on portal and the 'key' attribute can be either 'Primary Key' or 'Secondary Key'. This method and connectionString(String connectionString) take the same information in different forms. But it allows you to update the name and key.
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, AzureSasCredential credential)
Sets the credential with Shared Access Signature for the Service Bus resource. Refer to Service Bus access control with Shared Access Signatures.
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, TokenCredential credential)
Sets the credential by using a TokenCredential for the Service Bus resource. azure-identity has multiple TokenCredential implementations that can be used to authenticate the access to the Service Bus resource.
Parameters:
Returns:
customEndpointAddress
public ServiceBusClientBuilder customEndpointAddress(String customEndpointAddress)
Sets a custom endpoint address when connecting to the Service Bus service. This can be useful when your network does not allow connecting to the standard Azure Service Bus endpoint address, but does allow connecting through an intermediary. For example: https://my.custom.endpoint.com:55300.
If no port is specified, the default port for the transportType(AmqpTransportType transportType) is used.
Parameters:
Returns:
enableCrossEntityTransactions
public ServiceBusClientBuilder enableCrossEntityTransactions()
Enable cross entity transaction on the connection to Service bus. Use this feature only when your transaction scope spans across different Service Bus entities. This feature is achieved by routing all the messages through one 'send-via' entity on server side as explained next. Once clients are created for multiple entities, the first entity that an operation occurs on becomes the entity through which all subsequent sends will be routed through ('send-via' entity). This enables the service to perform a transaction that is meant to span multiple entities. This means that subsequent entities that perform their first operation need to either be senders, or if they are receivers they need to be on the same entity as the initial entity through which all sends are routed through (otherwise the service would not be able to ensure that the transaction is committed because it cannot route a receive operation through a different entity). For instance, if you have SenderA (For entity A) and ReceiverB (For entity B) that are created from a client with cross-entity transactions enabled, you would need to receive first with ReceiverB to allow this to work. If you first send to entity A, and then attempted to receive from entity B, an exception would be thrown.
Avoid using non-transaction API on this client
Since this feature will set up connection to Service Bus optimised to enable this feature. Once all the clients have been setup, the first receiver or sender used will initialize 'send-via' queue as a single message transfer entity. All the messages will flow via this queue. Thus this client is not suitable for any non-transaction API.
When not to enable this feature
If your transaction is involved in one Service bus entity only. For example you are receiving from one queue/subscription and you want to settle your own messages which are part of one transaction.
Returns:
fullyQualifiedNamespace
public ServiceBusClientBuilder fullyQualifiedNamespace(String fullyQualifiedNamespace)
Sets the fully-qualified namespace for the Service Bus.
Parameters:
Returns:
processor
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processor()
A new instance of ServiceBusProcessorClientBuilder used to configure ServiceBusProcessorClient instance.
Returns:
proxyOptions
public ServiceBusClientBuilder proxyOptions(ProxyOptions proxyOptions)
Sets the proxy configuration to use for ServiceBusSenderAsyncClient. When a proxy is configured, AMQP_WEB_SOCKETS must be used for the transport type.
Parameters:
Returns:
receiver
public ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiver()
A new instance of ServiceBusReceiverClientBuilder used to configure Service Bus message receivers.
Returns:
retryOptions
public ServiceBusClientBuilder retryOptions(AmqpRetryOptions retryOptions)
Sets the retry options for Service Bus clients. If not specified, the default retry options are used.
Parameters:
Returns:
ruleManager
public ServiceBusClientBuilder.ServiceBusRuleManagerBuilder ruleManager()
A new instance of ServiceBusRuleManagerBuilder used to configure a Service Bus rule manager instance.
Returns:
sender
public ServiceBusClientBuilder.ServiceBusSenderClientBuilder sender()
A new instance of ServiceBusSenderClientBuilder used to configure Service Bus message senders.
Returns:
sessionProcessor
public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder sessionProcessor()
A new instance of ServiceBusSessionProcessorClientBuilder used to configure a Service Bus processor instance that processes sessions.
Returns:
sessionReceiver
public ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiver()
A new instance of ServiceBusSessionReceiverClientBuilder used to configure session aware Service Bus message receivers.
Returns:
transportType
public ServiceBusClientBuilder transportType(AmqpTransportType transportType)
Sets the transport type by which all the communication with Azure Service Bus occurs. Default value is AMQP.
Parameters:
Returns: