Dela via


ServiceBusSenderAsyncClient Class

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusSenderAsyncClient

Implements

public final class ServiceBusSenderAsyncClient
implements AutoCloseable

An asynchronous client to send messages to a Service Bus resource.

The examples shown in this document use a credential object named DefaultAzureCredential for authentication, which is appropriate for most scenarios, including local development and production environments. Additionally, we recommend using managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation".

Sample: Create an instance of sender

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusSenderAsyncClient asyncSender = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sender()
     .queueName(queueName)
     .buildAsyncClient();

 // When users are done with the sender, they should dispose of it.
 // Clients should be long-lived objects as they require resources
 // and time to establish a connection to the service.
 asyncSender.close();

Sample: Send messages to a Service Bus resource

// `subscribe` is a non-blocking call. The program will move onto the next line of code when it starts the
 // operation.  Users should use the callbacks on `subscribe` to understand the status of the send operation.
 asyncSender.createMessageBatch().flatMap(batch -> {
     batch.tryAddMessage(new ServiceBusMessage("test-1"));
     batch.tryAddMessage(new ServiceBusMessage("test-2"));
     return asyncSender.sendMessages(batch);
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred while sending batch:" + error);
 }, () -> {
     System.out.println("Send complete.");
 });

Sample: Send messages using a size-limited ServiceBusMessageBatch to a Service Bus resource

Flux<ServiceBusMessage> telemetryMessages = Flux.just(firstMessage, secondMessage);

 // Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
 // In this case, all the batches created with these options are limited to 256 bytes.
 CreateMessageBatchOptions options = new CreateMessageBatchOptions()
     .setMaximumSizeInBytes(256);
 AtomicReference<ServiceBusMessageBatch> currentBatch = new AtomicReference<>();

 // Sends the current batch if it is not null and not empty.  If the current batch is null, sets it.
 // Returns the batch to work with.
 Mono<ServiceBusMessageBatch> sendBatchAndGetCurrentBatchOperation = Mono.defer(() -> {
     ServiceBusMessageBatch batch = currentBatch.get();

     if (batch == null) {
         return asyncSender.createMessageBatch(options);
     }

     if (batch.getCount() > 0) {
         return asyncSender.sendMessages(batch).then(
             asyncSender.createMessageBatch(options)
                 .handle((ServiceBusMessageBatch newBatch, SynchronousSink<ServiceBusMessageBatch> sink) -> {
                     // Expect that the batch we just sent is the current one. If it is not, there's a race
                     // condition accessing currentBatch reference.
                     if (!currentBatch.compareAndSet(batch, newBatch)) {
                         sink.error(new IllegalStateException(
                             "Expected that the object in currentBatch was batch. But it is not."));
                     } else {
                         sink.next(newBatch);
                     }
                 }));
     } else {
         return Mono.just(batch);
     }
 });

 // The sample Flux contains two messages, but it could be an infinite stream of telemetry messages.
 Flux<Void> sendMessagesOperation = telemetryMessages.flatMap(message -> {
     return sendBatchAndGetCurrentBatchOperation.flatMap(batch -> {
         if (batch.tryAddMessage(message)) {
             return Mono.empty();
         } else {
             return sendBatchAndGetCurrentBatchOperation
                 .handle((ServiceBusMessageBatch newBatch, SynchronousSink<Void> sink) -> {
                     if (!newBatch.tryAddMessage(message)) {
                         sink.error(new IllegalArgumentException(
                             "Message is too large to fit in an empty batch."));
                     } else {
                         sink.complete();
                     }
                 });
         }
     });
 });

 // `subscribe` is a non-blocking call. The program will move onto the next line of code when it starts the
 // operation.  Users should use the callbacks on `subscribe` to understand the status of the send operation.
 Disposable disposable = sendMessagesOperation.then(sendBatchAndGetCurrentBatchOperation)
     .subscribe(batch -> {
         System.out.println("Last batch should be empty: " + batch.getCount());
     }, error -> {
         System.err.println("Error sending telemetry messages: " + error);
     }, () -> {
         System.out.println("Completed.");

         // Continue using the sender and finally, dispose of the sender.
         // Clients should be long-lived objects as they require resources
         // and time to establish a connection to the service.
         asyncSender.close();
     });

Sample: Sending a message to a session-enabled queue

The snippet below demonstrates sending a message to a Service Bus sessions enabled queue. Setting setMessageId(String messageId) property to "greetings" will send the message to a Service Bus session with an id of "greetings".

// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
     .sender()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Setting sessionId publishes that message to a specific session, in this case, "greeting".
 ServiceBusMessage message = new ServiceBusMessage("Hello world")
     .setSessionId("greetings");

 // `subscribe` is a non-blocking call. The program will move onto the next line of code when it starts the
 // operation.  Users should use the callbacks on `subscribe` to understand the status of the send operation.
 sender.sendMessage(message).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred publishing batch: " + error);
 }, () -> {
     System.out.println("Send complete.");
 });

 // Continue using the sender and finally, dispose of the sender.
 // Clients should be long-lived objects as they require resources
 // and time to establish a connection to the service.
 sender.close();

Method Summary

Modifier and Type Method and Description
Mono<Void> cancelScheduledMessage(long sequenceNumber)

Cancels the enqueuing of a scheduled message, if it was not already enqueued.

Mono<Void> cancelScheduledMessages(Iterable<Long> sequenceNumbers)

Cancels the enqueuing of an already scheduled message, if it was not already enqueued.

void close()

Disposes of the ServiceBusSenderAsyncClient.

Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext)

Commits the transaction given ServiceBusTransactionContext.

Mono<ServiceBusMessageBatch> createMessageBatch()

Creates a ServiceBusMessageBatch that can fit as many messages as the transport allows.

Mono<ServiceBusMessageBatch> createMessageBatch(CreateMessageBatchOptions options)

Creates an ServiceBusMessageBatch configured with the options specified.

Mono<ServiceBusTransactionContext> createTransaction()

Starts a new transaction on Service Bus.

String getEntityPath()

Gets the name of the Service Bus resource.

String getFullyQualifiedNamespace()

Gets the fully qualified namespace.

String getIdentifier()

Gets the identifier of the instance of ServiceBusSenderAsyncClient.

Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext)

Rollbacks the transaction given ServiceBusTransactionContext.

Mono<Long> scheduleMessage(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime)

Sends a scheduled message to the Azure Service Bus entity this sender is connected to.

Mono<Long> scheduleMessage(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext)

Sends a scheduled message to the Azure Service Bus entity this sender is connected to.

Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime)

Sends a batch of scheduled messages to the Azure Service Bus entity this sender is connected to.

Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext)

Sends a scheduled messages to the Azure Service Bus entity this sender is connected to.

Mono<Void> sendMessage(ServiceBusMessage message)

Sends a message to a Service Bus queue or topic.

Mono<Void> sendMessage(ServiceBusMessage message, ServiceBusTransactionContext transactionContext)

Sends a message to a Service Bus queue or topic.

Mono<Void> sendMessages(ServiceBusMessageBatch batch)

Sends a message batch to the Azure Service Bus entity this sender is connected to.

Mono<Void> sendMessages(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext)

Sends a message batch to the Azure Service Bus entity this sender is connected to.

Mono<Void> sendMessages(Iterable<ServiceBusMessage> messages)

Sends a set of messages to a Service Bus queue or topic using a batched approach.

Mono<Void> sendMessages(Iterable<ServiceBusMessage> messages, ServiceBusTransactionContext transactionContext)

Sends a set of messages to a Service Bus queue or topic using a batched approach.

Methods inherited from java.lang.Object

Method Details

cancelScheduledMessage

public Mono cancelScheduledMessage(long sequenceNumber)

Cancels the enqueuing of a scheduled message, if it was not already enqueued.

Parameters:

sequenceNumber - of the scheduled message to cancel.

Returns:

The Mono that finishes this operation on service bus resource.

cancelScheduledMessages

public Mono cancelScheduledMessages(Iterable sequenceNumbers)

Cancels the enqueuing of an already scheduled message, if it was not already enqueued.

Parameters:

sequenceNumbers - of the scheduled messages to cancel.

Returns:

The Mono that finishes this operation on service bus resource.

close

public void close()

Disposes of the ServiceBusSenderAsyncClient. If the client has a dedicated connection, the underlying connection is also closed.

commitTransaction

public Mono commitTransaction(ServiceBusTransactionContext transactionContext)

Commits the transaction given ServiceBusTransactionContext. This will make a call to Service Bus.

Parameters:

transactionContext - to be committed.

Returns:

The Mono that finishes this operation on Service Bus resource.

createMessageBatch

public Mono createMessageBatch()

Creates a ServiceBusMessageBatch that can fit as many messages as the transport allows.

Returns:

A ServiceBusMessageBatch that can fit as many messages as the transport allows.

createMessageBatch

public Mono createMessageBatch(CreateMessageBatchOptions options)

Creates an ServiceBusMessageBatch configured with the options specified.

Parameters:

options - A set of options used to configure the ServiceBusMessageBatch.

Returns:

A new ServiceBusMessageBatch configured with the given options.

createTransaction

public Mono createTransaction()

Starts a new transaction on Service Bus. The ServiceBusTransactionContext should be passed along with ServiceBusReceivedMessage all operations that needs to be in this transaction.

Returns:

getEntityPath

public String getEntityPath()

Gets the name of the Service Bus resource.

Returns:

The name of the Service Bus resource.

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

Gets the fully qualified namespace.

Returns:

The fully qualified namespace.

getIdentifier

public String getIdentifier()

Gets the identifier of the instance of ServiceBusSenderAsyncClient.

Returns:

The identifier that can identify the instance of ServiceBusSenderAsyncClient.

rollbackTransaction

public Mono rollbackTransaction(ServiceBusTransactionContext transactionContext)

Rollbacks the transaction given ServiceBusTransactionContext. This will make a call to Service Bus.

Parameters:

transactionContext - Transaction to rollback.

Returns:

The Mono that finishes this operation on the Service Bus resource.

scheduleMessage

public Mono scheduleMessage(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime)

Sends a scheduled message to the Azure Service Bus entity this sender is connected to. A scheduled message is enqueued and made available to receivers only at the scheduled enqueue time.

Parameters:

message - Message to be sent to the Service Bus Queue.
scheduledEnqueueTime - OffsetDateTime at which the message should appear in the Service Bus queue or topic.

Returns:

The sequence number of the scheduled message which can be used to cancel the scheduling of the message.

scheduleMessage

public Mono scheduleMessage(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext)

Sends a scheduled message to the Azure Service Bus entity this sender is connected to. A scheduled message is enqueued and made available to receivers only at the scheduled enqueue time.

Parameters:

message - Message to be sent to the Service Bus Queue.
scheduledEnqueueTime - OffsetDateTime at which the message should appear in the Service Bus queue or topic.
transactionContext - to be set on message before sending to Service Bus.

Returns:

The sequence number of the scheduled message which can be used to cancel the scheduling of the message.

scheduleMessages

public Flux scheduleMessages(Iterable messages, OffsetDateTime scheduledEnqueueTime)

Sends a batch of scheduled messages to the Azure Service Bus entity this sender is connected to. A scheduled message is enqueued and made available to receivers only at the scheduled enqueue time.

Parameters:

messages - Messages to be sent to the Service Bus queue or topic.
scheduledEnqueueTime - OffsetDateTime at which the message should appear in the Service Bus queue or topic.

Returns:

Sequence numbers of the scheduled messages which can be used to cancel the messages.

scheduleMessages

public Flux scheduleMessages(Iterable messages, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext)

Sends a scheduled messages to the Azure Service Bus entity this sender is connected to. A scheduled message is enqueued and made available to receivers only at the scheduled enqueue time.

Parameters:

messages - Messages to be sent to the Service Bus Queue.
scheduledEnqueueTime - OffsetDateTime at which the messages should appear in the Service Bus queue or topic.
transactionContext - Transaction to associate with the operation.

Returns:

Sequence numbers of the scheduled messages which can be used to cancel the messages.

sendMessage

public Mono sendMessage(ServiceBusMessage message)

Sends a message to a Service Bus queue or topic.

Parameters:

message - Message to be sent to Service Bus queue or topic.

Returns:

The Mono the finishes this operation on service bus resource.

sendMessage

public Mono sendMessage(ServiceBusMessage message, ServiceBusTransactionContext transactionContext)

Sends a message to a Service Bus queue or topic.

Parameters:

message - Message to be sent to Service Bus queue or topic.
transactionContext - to be set on batch message before sending to Service Bus.

Returns:

The Mono the finishes this operation on service bus resource.

sendMessages

public Mono sendMessages(ServiceBusMessageBatch batch)

Sends a message batch to the Azure Service Bus entity this sender is connected to.

Parameters:

batch - of messages which allows client to send maximum allowed size for a batch of messages.

Returns:

A Mono the finishes this operation on service bus resource.

sendMessages

public Mono sendMessages(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext)

Sends a message batch to the Azure Service Bus entity this sender is connected to.

Parameters:

batch - of messages which allows client to send maximum allowed size for a batch of messages.
transactionContext - to be set on batch message before sending to Service Bus.

Returns:

A Mono the finishes this operation on service bus resource.

sendMessages

public Mono sendMessages(Iterable messages)

Sends a set of messages to a Service Bus queue or topic using a batched approach. If the size of messages exceed the maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message size is the max amount allowed on the link.

Parameters:

messages - Messages to be sent to Service Bus queue or topic.

Returns:

A Mono that completes when all messages have been sent to the Service Bus resource.

sendMessages

public Mono sendMessages(Iterable messages, ServiceBusTransactionContext transactionContext)

Sends a set of messages to a Service Bus queue or topic using a batched approach. If the size of messages exceed the maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message size is the max amount allowed on the link.

Parameters:

messages - Messages to be sent to Service Bus queue or topic.
transactionContext - to be set on batch message before sending to Service Bus.

Returns:

A Mono that completes when all messages have been sent to the Service Bus resource.

Applies to