ServiceBusSenderAsyncClient Clase
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusSenderAsyncClient
- com.
Implementaciones
public final class ServiceBusSenderAsyncClient
implements AutoCloseable
Un cliente asincrónico para enviar mensajes a un recurso de Service Bus.
Los ejemplos que se muestran en este documento usan un objeto de credencial denominado DefaultAzureCredential para la autenticación, que es adecuado para la mayoría de los escenarios, incluidos los entornos de desarrollo y producción locales. Además, se recomienda usar la identidad administrada para la autenticación en entornos de producción. Puede encontrar más información sobre las distintas formas de autenticación y sus tipos de credenciales correspondientes en la documentación de Azure Identity.
Ejemplo: Creación de una instancia del remitente
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusSenderAsyncClient asyncSender = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sender()
.queueName(queueName)
.buildAsyncClient();
// When users are done with the sender, they should dispose of it.
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
asyncSender.close();
Ejemplo: Envío de mensajes a un recurso de Service Bus
// `subscribe` is a non-blocking call. The program will move onto the next line of code when it starts the
// operation. Users should use the callbacks on `subscribe` to understand the status of the send operation.
asyncSender.createMessageBatch().flatMap(batch -> {
batch.tryAddMessage(new ServiceBusMessage("test-1"));
batch.tryAddMessage(new ServiceBusMessage("test-2"));
return asyncSender.sendMessages(batch);
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred while sending batch:" + error);
}, () -> {
System.out.println("Send complete.");
});
Ejemplo: Envío de mensajes con un tamaño limitado ServiceBusMessageBatch a un recurso de Service Bus
Flux<ServiceBusMessage> telemetryMessages = Flux.just(firstMessage, secondMessage);
// Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
// In this case, all the batches created with these options are limited to 256 bytes.
CreateMessageBatchOptions options = new CreateMessageBatchOptions()
.setMaximumSizeInBytes(256);
AtomicReference<ServiceBusMessageBatch> currentBatch = new AtomicReference<>();
// Sends the current batch if it is not null and not empty. If the current batch is null, sets it.
// Returns the batch to work with.
Mono<ServiceBusMessageBatch> sendBatchAndGetCurrentBatchOperation = Mono.defer(() -> {
ServiceBusMessageBatch batch = currentBatch.get();
if (batch == null) {
return asyncSender.createMessageBatch(options);
}
if (batch.getCount() > 0) {
return asyncSender.sendMessages(batch).then(
asyncSender.createMessageBatch(options)
.handle((ServiceBusMessageBatch newBatch, SynchronousSink<ServiceBusMessageBatch> sink) -> {
// Expect that the batch we just sent is the current one. If it is not, there's a race
// condition accessing currentBatch reference.
if (!currentBatch.compareAndSet(batch, newBatch)) {
sink.error(new IllegalStateException(
"Expected that the object in currentBatch was batch. But it is not."));
} else {
sink.next(newBatch);
}
}));
} else {
return Mono.just(batch);
}
});
// The sample Flux contains two messages, but it could be an infinite stream of telemetry messages.
Flux<Void> sendMessagesOperation = telemetryMessages.flatMap(message -> {
return sendBatchAndGetCurrentBatchOperation.flatMap(batch -> {
if (batch.tryAddMessage(message)) {
return Mono.empty();
} else {
return sendBatchAndGetCurrentBatchOperation
.handle((ServiceBusMessageBatch newBatch, SynchronousSink<Void> sink) -> {
if (!newBatch.tryAddMessage(message)) {
sink.error(new IllegalArgumentException(
"Message is too large to fit in an empty batch."));
} else {
sink.complete();
}
});
}
});
});
// `subscribe` is a non-blocking call. The program will move onto the next line of code when it starts the
// operation. Users should use the callbacks on `subscribe` to understand the status of the send operation.
Disposable disposable = sendMessagesOperation.then(sendBatchAndGetCurrentBatchOperation)
.subscribe(batch -> {
System.out.println("Last batch should be empty: " + batch.getCount());
}, error -> {
System.err.println("Error sending telemetry messages: " + error);
}, () -> {
System.out.println("Completed.");
// Continue using the sender and finally, dispose of the sender.
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
asyncSender.close();
});
Ejemplo: Envío de un mensaje a una cola habilitada para sesión
En el fragmento de código siguiente se muestra cómo enviar un mensaje a una cola habilitada para sesiones de Service Bus . Al establecer setMessageId(String messageId) la propiedad en "greetings" se enviará el mensaje a una sesión de Service Bus con un identificador de "greetings".
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
.sender()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// Setting sessionId publishes that message to a specific session, in this case, "greeting".
ServiceBusMessage message = new ServiceBusMessage("Hello world")
.setSessionId("greetings");
// `subscribe` is a non-blocking call. The program will move onto the next line of code when it starts the
// operation. Users should use the callbacks on `subscribe` to understand the status of the send operation.
sender.sendMessage(message).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred publishing batch: " + error);
}, () -> {
System.out.println("Send complete.");
});
// Continue using the sender and finally, dispose of the sender.
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
sender.close();
Resumen del método
Métodos heredados de java.lang.Object
Detalles del método
cancelScheduledMessage
public Mono
Cancela la puesta en cola de un mensaje programado, si aún no se ha puesto en cola.
Parameters:
Returns:
cancelScheduledMessages
public Mono
Cancela la puesta en cola de un mensaje ya programado, si aún no se ha puesto en cola.
Parameters:
Returns:
close
public void close()
Elimina de ServiceBusSenderAsyncClient. Si el cliente tiene una conexión dedicada, también se cierra la conexión subyacente.
commitTransaction
public Mono
Confirma la transacción dada ServiceBusTransactionContext. Esto realizará una llamada a Service Bus.
Parameters:
Returns:
createMessageBatch
public Mono
Crea un objeto ServiceBusMessageBatch que puede ajustarse a tantos mensajes como permita el transporte.
Returns:
createMessageBatch
public Mono
Crea un ServiceBusMessageBatch objeto configurado con las opciones especificadas.
Parameters:
Returns:
createTransaction
public Mono
Inicia una nueva transacción en Service Bus. ServiceBusTransactionContext Debe pasarse junto con ServiceBusReceivedMessage todas las operaciones que deben estar en esta transacción.
Returns:
getEntityPath
public String getEntityPath()
Obtiene el nombre del recurso de Service Bus.
Returns:
getFullyQualifiedNamespace
public String getFullyQualifiedNamespace()
Obtiene el espacio de nombres completo.
Returns:
getIdentifier
public String getIdentifier()
Obtiene el identificador de la instancia de ServiceBusSenderAsyncClient.
Returns:
rollbackTransaction
public Mono
Revierte la transacción dada ServiceBusTransactionContext. Esto realizará una llamada a Service Bus.
Parameters:
Returns:
scheduleMessage
public Mono
Envía un mensaje programado a la entidad Azure Service Bus a la que está conectado este remitente. Se pone en cola un mensaje programado y se pone a disposición de los receptores solo en el momento de la puesta en cola programada.
Parameters:
Returns:
scheduleMessage
public Mono
Envía un mensaje programado a la entidad Azure Service Bus a la que está conectado este remitente. Se pone en cola un mensaje programado y se pone a disposición de los receptores solo en la hora de puesta en cola programada.
Parameters:
Returns:
scheduleMessages
public Flux
Envía un lote de mensajes programados a la entidad Azure Service Bus a la que está conectado este remitente. Se pone en cola un mensaje programado y se pone a disposición de los receptores solo en la hora de puesta en cola programada.
Parameters:
Returns:
scheduleMessages
public Flux
Envía mensajes programados a la entidad Azure Service Bus a la que está conectado este remitente. Se pone en cola un mensaje programado y se pone a disposición de los receptores solo en la hora de puesta en cola programada.
Parameters:
Returns:
sendMessage
public Mono
Envía un mensaje a una cola o tema de Service Bus.
Parameters:
Returns:
sendMessage
public Mono
Envía un mensaje a una cola o tema de Service Bus.
Parameters:
Returns:
sendMessages
public Mono
Envía un lote de mensajes a la entidad Azure Service Bus a la que está conectado este remitente.
Parameters:
Returns:
sendMessages
public Mono
Envía un lote de mensajes a la entidad Azure Service Bus a la que está conectado este remitente.
Parameters:
Returns:
sendMessages
public Mono
Envía un conjunto de mensajes a una cola o tema de Service Bus mediante un enfoque por lotes. Si el tamaño de los mensajes supera el tamaño máximo de un solo lote, se desencadenará una excepción y se producirá un error en el envío. De forma predeterminada, el tamaño del mensaje es la cantidad máxima permitida en el vínculo.
Parameters:
Returns:
sendMessages
public Mono
Envía un conjunto de mensajes a una cola o tema de Service Bus mediante un enfoque por lotes. Si el tamaño de los mensajes supera el tamaño máximo de un solo lote, se desencadenará una excepción y se producirá un error en el envío. De forma predeterminada, el tamaño del mensaje es la cantidad máxima permitida en el vínculo.
Parameters:
Returns:
Se aplica a
Azure SDK for Java