Dela via


ServiceBusClientBuilder Class

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusClientBuilder

Implements

public final class ServiceBusClientBuilder
implements TokenCredentialTrait<ServiceBusClientBuilder>, AzureNamedKeyCredentialTrait<ServiceBusClientBuilder>, ConnectionStringTrait<ServiceBusClientBuilder>, AzureSasCredentialTrait<ServiceBusClientBuilder>, AmqpTrait<ServiceBusClientBuilder>, ConfigurationTrait<ServiceBusClientBuilder>

This class provides a fluent builder API to aid the instantiation of clients to send and receive messages to/from Service Bus entities.

Credentials are required to perform operations against Azure Service Bus. They can be set by using one of the following methods:

The credential used in the following samples is DefaultAzureCredential for authentication. It is appropriate for most scenarios, including local development and production environments. Additionally, we recommend using managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation".

Clients and sub-builders

ServiceBusClientBuilder can instantiate several clients. The client to instantiate depends on whether users are publishing or receiving messages and if the entity has Service Bus sessions enabled.

Sending messages

Sample: Instantiate a synchronous sender and send a message

The following code sample demonstrates the creation of the synchronous client ServiceBusSenderClient and sending a message. The fullyQualifiedNamespace is the Service Bus namespace's host name. It is listed under the "Essentials" panel after navigating to the Service Bus namespace via Azure Portal. The credential used is DefaultAzureCredential because it combines commonly used credentials in deployment and development and chooses the credential to used based on its running environment. When performance is important, consider using ServiceBusMessageBatch to publish multiple messages at once.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusSenderClient sender = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sender()
     .queueName(queueName)
     .buildClient();

 sender.sendMessage(new ServiceBusMessage("Foo bar"));

Consuming messages

There are multiple clients for consuming messages from a Service Bus entity (that is not have Service Bus sessions enabled).

Sample: Instantiate an asynchronous receiver

The code example below demonstrates creating an async receiver. The credential used is DefaultAzureCredential for authentication. It is appropriate for most scenarios, including local development and production environments. PEEK_LOCK and disableAutoComplete() are strongly recommended so users have control over message settlement.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .receiver()
     .disableAutoComplete()
     .queueName(queueName)
     .buildAsyncClient();

 // When users are done with the receiver, dispose of the receiver.
 // Clients should be long-lived objects as they require resources
 // and time to establish a connection to the service.
 asyncReceiver.close();

Sample: Instantiate ServiceBusProcessorClient

The code example below demonstrates creating a processor client. The processor client is recommended for most production scenarios because it offers connection recovery. The credential used is DefaultAzureCredential for authentication. It is appropriate for most scenarios, including local development and production environments. PEEK_LOCK and disableAutoComplete() are strongly recommended so users have control over message settlement.

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
     // handling message reaches desired state such that it doesn't require Service Bus to redeliver
     // the same message, then context.complete() should be called otherwise context.abandon().
     final boolean success = Math.random() < 0.5;
     if (success) {
         try {
             context.complete();
         } catch (RuntimeException error) {
             System.out.printf("Completion of the message %s failed.%n Error: %s%n",
                 message.getMessageId(), error);
         }
     } else {
         try {
             context.abandon();
         } catch (RuntimeException error) {
             System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
                 message.getMessageId(), error);
         }
     }
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()  // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

Consuming messages from a session-enabled Service Bus entity

Service Bus supports joint and ordered handling of unbounded sequences of messages through Service Bus sessions. Sessions can be used as a first in, first out (FIFO) processing of messages. Queues and topics/subscriptions support Service Bus sessions, however, it must be enabled at the time of entity creation.

Sample: Sending a message to a session-enabled queue

The snippet below demonstrates sending a message to a Service Bus sessions enabled queue. Setting setMessageId(String messageId) property to "greetings" will send the message to a Service Bus session with an id of "greetings".

// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusSenderClient sender = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
     .sender()
     .queueName(sessionEnabledQueueName)
     .buildClient();

 // Setting sessionId publishes that message to a specific session, in this case, "greeting".
 ServiceBusMessage message = new ServiceBusMessage("Hello world")
     .setSessionId("greetings");

 sender.sendMessage(message);

 // Dispose of the sender.
 sender.close();

Sample: Receive messages from first available session

To process messages from the first available session, switch to ServiceBusSessionReceiverClientBuilder and build the session receiver client. Use acceptNextSession() to find the first available session to process messages from.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Creates a client to receive messages from the first available session. It waits until
 // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
 // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
 Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();

 Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
     receiver -> receiver.receiveMessages().flatMap(message -> {
         System.out.println("Received message: " + message.getBody());

         // Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return receiver.complete(message);
         } else {
             return receiver.abandon(message);
         }
     }),
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of the receiver and sessionReceiver when done receiving messages.
         receiver.close();
         sessionReceiver.close();
     }));

 // This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
 // operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
 // receiving messages.
 Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
 }, error -> System.out.println("Error occurred: " + error),
     () -> System.out.println("Receiving complete."));

Sample: Process messages from all sessions

The following code sample demonstrates the creation the ServiceBusProcessorClient that processes all available sessions in the queue. ServiceBusSessionProcessorClientBuilder#maxConcurrentSessions(int) indicates how many sessions the processor will process at the same time. The credential used is DefaultAzureCredential for authentication. It is appropriate for most scenarios, including local development and production environments. PEEK_LOCK and disableAutoComplete() are strongly recommended so users have control over message settlement.

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
     ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 Consumer<ServiceBusErrorContext> onError = context -> {
     System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
         context.getFullyQualifiedNamespace(), context.getEntityPath());

     if (context.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) context.getException();

         System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", context.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .sessionProcessor()
     .queueName(sessionEnabledQueueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()
     .maxConcurrentSessions(2)
     .processMessage(onMessage)
     .processError(onError)
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 sessionProcessor.start();

 // Stop processor and dispose when done processing messages.
 sessionProcessor.stop();
 sessionProcessor.close();

Connection sharing

The creation of a connection to Service Bus requires resources. If your architecture allows, an application should share connection between clients which can be achieved by sharing the top level builder as shown below.

Sharing a connection between clients

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // Any clients created from this builder will share the underlying connection.
 ServiceBusClientBuilder sharedConnectionBuilder = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential);

 // Create receiver and sender which will share the connection.
 ServiceBusReceiverClient receiver = sharedConnectionBuilder
     .receiver()
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .queueName(queueName)
     .buildClient();
 ServiceBusSenderClient sender = sharedConnectionBuilder
     .sender()
     .queueName(queueName)
     .buildClient();

 // Use the clients and finally close them.
 try {
     sender.sendMessage(new ServiceBusMessage("payload"));
     receiver.receiveMessages(1);
 } finally {
     // Clients should be long-lived objects as they require resources
     // and time to establish a connection to the service.
     sender.close();
     receiver.close();
 }

Constructor Summary

Constructor Description
ServiceBusClientBuilder()

Creates a new instance with the default transport AMQP.

Method Summary

Modifier and Type Method and Description
ServiceBusClientBuilder clientOptions(ClientOptions clientOptions)

Sets the ClientOptions to be sent from the client built from this builder, enabling customization of certain properties, as well as support the addition of custom header information.

ServiceBusClientBuilder configuration(Configuration configuration)

Sets the configuration store that is used during construction of the service client.

ServiceBusClientBuilder connectionString(String connectionString)

Sets the connection string for a Service Bus namespace or a specific Service Bus resource.

ServiceBusClientBuilder credential(AzureNamedKeyCredential credential)

Sets the credential with the shared access policies for the Service Bus resource.

ServiceBusClientBuilder credential(AzureSasCredential credential)

Sets the credential with Shared Access Signature for the Service Bus resource.

ServiceBusClientBuilder credential(TokenCredential credential)

Sets the TokenCredential used to authorize requests sent to the service.

ServiceBusClientBuilder credential(String fullyQualifiedNamespace, AzureNamedKeyCredential credential)

Sets the credential with the shared access policies for the Service Bus resource.

ServiceBusClientBuilder credential(String fullyQualifiedNamespace, AzureSasCredential credential)

Sets the credential with Shared Access Signature for the Service Bus resource.

ServiceBusClientBuilder credential(String fullyQualifiedNamespace, TokenCredential credential)

Sets the credential by using a TokenCredential for the Service Bus resource.

ServiceBusClientBuilder customEndpointAddress(String customEndpointAddress)

Sets a custom endpoint address when connecting to the Service Bus service.

ServiceBusClientBuilder enableCrossEntityTransactions()

Enable cross entity transaction on the connection to Service bus.

ServiceBusClientBuilder fullyQualifiedNamespace(String fullyQualifiedNamespace)

Sets the fully-qualified namespace for the Service Bus.

ServiceBusProcessorClientBuilder processor()

A new instance of ServiceBusProcessorClientBuilder used to configure ServiceBusProcessorClient instance.

ServiceBusClientBuilder proxyOptions(ProxyOptions proxyOptions)

Sets the proxy configuration to use for ServiceBusSenderAsyncClient.

ServiceBusReceiverClientBuilder receiver()

A new instance of ServiceBusReceiverClientBuilder used to configure Service Bus message receivers.

ServiceBusClientBuilder retryOptions(AmqpRetryOptions retryOptions)

Sets the retry options for Service Bus clients.

ServiceBusRuleManagerBuilder ruleManager()

A new instance of ServiceBusRuleManagerBuilder used to configure a Service Bus rule manager instance.

ServiceBusSenderClientBuilder sender()

A new instance of ServiceBusSenderClientBuilder used to configure Service Bus message senders.

ServiceBusSessionProcessorClientBuilder sessionProcessor()

A new instance of ServiceBusSessionProcessorClientBuilder used to configure a Service Bus processor instance that processes sessions.

ServiceBusSessionReceiverClientBuilder sessionReceiver()

A new instance of ServiceBusSessionReceiverClientBuilder used to configure session aware Service Bus message receivers.

ServiceBusClientBuilder transportType(AmqpTransportType transportType)

Sets the transport type by which all the communication with Azure Service Bus occurs.

Methods inherited from java.lang.Object

Constructor Details

ServiceBusClientBuilder

public ServiceBusClientBuilder()

Creates a new instance with the default transport AMQP.

Method Details

clientOptions

public ServiceBusClientBuilder clientOptions(ClientOptions clientOptions)

Sets the ClientOptions to be sent from the client built from this builder, enabling customization of certain properties, as well as support the addition of custom header information. Refer to the ClientOptions documentation for more information.

Parameters:

clientOptions - to be set on the client.

Returns:

The updated ServiceBusClientBuilder object.

configuration

public ServiceBusClientBuilder configuration(Configuration configuration)

Sets the configuration store that is used during construction of the service client. If not specified, the default configuration store is used to configure Service Bus clients. Use NONE to bypass using configuration settings during construction.

Parameters:

configuration - The configuration store used to configure Service Bus clients.

Returns:

The updated ServiceBusClientBuilder object.

connectionString

public ServiceBusClientBuilder connectionString(String connectionString)

Sets the connection string for a Service Bus namespace or a specific Service Bus resource.

Parameters:

connectionString - Connection string for a Service Bus namespace or a specific Service Bus resource.

Returns:

The updated ServiceBusClientBuilder object.

credential

public ServiceBusClientBuilder credential(AzureNamedKeyCredential credential)

Sets the credential with the shared access policies for the Service Bus resource. You can find the shared access policies on the azure portal or Azure CLI. For instance, on the portal, "Shared Access policies" has 'policy' and its 'Primary Key' and 'Secondary Key'. The 'name' attribute of the AzureNamedKeyCredential is the 'policy' on portal and the 'key' attribute can be either 'Primary Key' or 'Secondary Key'. This method and connectionString(String connectionString) take the same information in different forms. But it allows you to update the name and key.

Parameters:

credential - AzureNamedKeyCredential to be used for authentication.

Returns:

The updated ServiceBusClientBuilder object.

credential

public ServiceBusClientBuilder credential(AzureSasCredential credential)

Sets the credential with Shared Access Signature for the Service Bus resource. Refer to Service Bus access control with Shared Access Signatures.

Parameters:

credential - AzureSasCredential to be used for authentication.

Returns:

The updated ServiceBusClientBuilder object.

credential

public ServiceBusClientBuilder credential(TokenCredential credential)

Sets the TokenCredential used to authorize requests sent to the service. Refer to the Azure SDK for Java identity and authentication documentation for more details on proper usage of the TokenCredential type.

Parameters:

credential - The token credential to use for authentication. Access controls may be specified by the ServiceBus namespace or the requested Service Bus entity, depending on Azure configuration.

Returns:

The updated ServiceBusClientBuilder object.

credential

public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, AzureNamedKeyCredential credential)

Sets the credential with the shared access policies for the Service Bus resource. You can find the shared access policies on the azure portal or Azure CLI. For instance, on the portal, "Shared Access policies" has 'policy' and its 'Primary Key' and 'Secondary Key'. The 'name' attribute of the AzureNamedKeyCredential is the 'policy' on portal and the 'key' attribute can be either 'Primary Key' or 'Secondary Key'. This method and connectionString(String connectionString) take the same information in different forms. But it allows you to update the name and key.

Parameters:

fullyQualifiedNamespace - The fully-qualified namespace for the Service Bus.
credential - AzureNamedKeyCredential to be used for authentication.

Returns:

The updated ServiceBusClientBuilder object.

credential

public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, AzureSasCredential credential)

Sets the credential with Shared Access Signature for the Service Bus resource. Refer to Service Bus access control with Shared Access Signatures.

Parameters:

fullyQualifiedNamespace - The fully-qualified namespace for the Service Bus.
credential - AzureSasCredential to be used for authentication.

Returns:

The updated ServiceBusClientBuilder object.

credential

public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, TokenCredential credential)

Sets the credential by using a TokenCredential for the Service Bus resource. azure-identity has multiple TokenCredential implementations that can be used to authenticate the access to the Service Bus resource.

Parameters:

fullyQualifiedNamespace - The fully-qualified namespace for the Service Bus.
credential - The token credential to use for authentication. Access controls may be specified by the ServiceBus namespace or the requested Service Bus entity, depending on Azure configuration.

Returns:

The updated ServiceBusClientBuilder object.

customEndpointAddress

public ServiceBusClientBuilder customEndpointAddress(String customEndpointAddress)

Sets a custom endpoint address when connecting to the Service Bus service. This can be useful when your network does not allow connecting to the standard Azure Service Bus endpoint address, but does allow connecting through an intermediary. For example: https://my.custom.endpoint.com:55300.

If no port is specified, the default port for the transportType(AmqpTransportType transportType) is used.

Parameters:

customEndpointAddress - The custom endpoint address.

Returns:

The updated ServiceBusClientBuilder object.

enableCrossEntityTransactions

public ServiceBusClientBuilder enableCrossEntityTransactions()

Enable cross entity transaction on the connection to Service bus. Use this feature only when your transaction scope spans across different Service Bus entities. This feature is achieved by routing all the messages through one 'send-via' entity on server side as explained next. Once clients are created for multiple entities, the first entity that an operation occurs on becomes the entity through which all subsequent sends will be routed through ('send-via' entity). This enables the service to perform a transaction that is meant to span multiple entities. This means that subsequent entities that perform their first operation need to either be senders, or if they are receivers they need to be on the same entity as the initial entity through which all sends are routed through (otherwise the service would not be able to ensure that the transaction is committed because it cannot route a receive operation through a different entity). For instance, if you have SenderA (For entity A) and ReceiverB (For entity B) that are created from a client with cross-entity transactions enabled, you would need to receive first with ReceiverB to allow this to work. If you first send to entity A, and then attempted to receive from entity B, an exception would be thrown.

Avoid using non-transaction API on this client

Since this feature will set up connection to Service Bus optimised to enable this feature. Once all the clients have been setup, the first receiver or sender used will initialize 'send-via' queue as a single message transfer entity. All the messages will flow via this queue. Thus this client is not suitable for any non-transaction API.

When not to enable this feature

If your transaction is involved in one Service bus entity only. For example you are receiving from one queue/subscription and you want to settle your own messages which are part of one transaction.

Returns:

The updated ServiceBusSenderClientBuilder object.

fullyQualifiedNamespace

public ServiceBusClientBuilder fullyQualifiedNamespace(String fullyQualifiedNamespace)

Sets the fully-qualified namespace for the Service Bus.

Parameters:

fullyQualifiedNamespace - The fully-qualified namespace for the Service Bus.

Returns:

The updated ServiceBusClientBuilder object.

processor

public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processor()

A new instance of ServiceBusProcessorClientBuilder used to configure ServiceBusProcessorClient instance.

Returns:

A new instance of ServiceBusProcessorClientBuilder.

proxyOptions

public ServiceBusClientBuilder proxyOptions(ProxyOptions proxyOptions)

Sets the proxy configuration to use for ServiceBusSenderAsyncClient. When a proxy is configured, AMQP_WEB_SOCKETS must be used for the transport type.

Parameters:

proxyOptions - The proxy configuration to use.

Returns:

The updated ServiceBusClientBuilder object.

receiver

public ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiver()

A new instance of ServiceBusReceiverClientBuilder used to configure Service Bus message receivers.

Returns:

A new instance of ServiceBusReceiverClientBuilder.

retryOptions

public ServiceBusClientBuilder retryOptions(AmqpRetryOptions retryOptions)

Sets the retry options for Service Bus clients. If not specified, the default retry options are used.

Parameters:

retryOptions - The retry options to use.

Returns:

The updated ServiceBusClientBuilder object.

ruleManager

public ServiceBusClientBuilder.ServiceBusRuleManagerBuilder ruleManager()

A new instance of ServiceBusRuleManagerBuilder used to configure a Service Bus rule manager instance.

Returns:

A new instance of ServiceBusRuleManagerBuilder.

sender

public ServiceBusClientBuilder.ServiceBusSenderClientBuilder sender()

A new instance of ServiceBusSenderClientBuilder used to configure Service Bus message senders.

Returns:

A new instance of ServiceBusSenderClientBuilder.

sessionProcessor

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder sessionProcessor()

A new instance of ServiceBusSessionProcessorClientBuilder used to configure a Service Bus processor instance that processes sessions.

Returns:

A new instance of ServiceBusSessionProcessorClientBuilder.

sessionReceiver

public ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiver()

A new instance of ServiceBusSessionReceiverClientBuilder used to configure session aware Service Bus message receivers.

Returns:

A new instance of ServiceBusSessionReceiverClientBuilder.

transportType

public ServiceBusClientBuilder transportType(AmqpTransportType transportType)

Sets the transport type by which all the communication with Azure Service Bus occurs. Default value is AMQP.

Parameters:

transportType - The transport type to use.

Returns:

The updated ServiceBusClientBuilder object.

Applies to