ServiceBusSessionReceiverAsyncClient Clase
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusSessionReceiverAsyncClient
- com.
Implementaciones
public final class ServiceBusSessionReceiverAsyncClient
implements AutoCloseable
Este cliente receptor de sesión asincrónico se usa para adquirir bloqueos de sesión de una cola o tema y crear ServiceBusReceiverAsyncClient instancias vinculadas a las sesiones bloqueadas. Las sesiones se pueden usar como una primera entrada, primero en salir (FIFO) procesamiento de mensajes. Las colas y temas o suscripciones admiten sesiones de Service Bus; sin embargo, se debe habilitar en el momento de la creación de la entidad.
Los ejemplos que se muestran en este documento usan un objeto de credencial denominado DefaultAzureCredential para la autenticación, que es adecuado para la mayoría de los escenarios, incluidos los entornos de desarrollo y producción locales. Además, se recomienda usar la identidad administrada para la autenticación en entornos de producción. Puede encontrar más información sobre las distintas formas de autenticación y sus tipos de credenciales correspondientes en la documentación de Identidad de Azure".
Ejemplo: Recibir mensajes de una sesión específica
Use acceptSession(String sessionId) para adquirir el bloqueo de una sesión si conoce el identificador de sesión y PEEK_LOCKdisableAutoComplete() se recomienda encarecidamente para que los usuarios tengan control sobre la liquidación de mensajes.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
// successfully locked.
// `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
// operations complete.
// `Mono.usingWhen` can also be used if the resource closure returns a single item.
Flux<Void> sessionMessages = Flux.usingWhen(
sessionReceiver.acceptSession("<<my-session-id>>"),
receiver -> {
// Receive messages from <<my-session-id>> session.
return receiver.receiveMessages().flatMap(message -> {
System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
message.getBody());
// Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
});
},
receiver -> Mono.fromRunnable(() -> {
// Dispose of resources.
receiver.close();
sessionReceiver.close();
}));
// When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
// is non-blocking and kicks off the operation.
Disposable subscription = sessionMessages.subscribe(
unused -> {
}, error -> System.err.print("Error receiving message from session: " + error),
() -> System.out.println("Completed receiving from session."));
Ejemplo: Recibir mensajes de la primera sesión disponible
Use acceptNextSession() para adquirir el bloqueo de la siguiente sesión disponible sin especificar el identificador de sesión.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// Creates a client to receive messages from the first available session. It waits until
// AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
// completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();
Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
receiver -> receiver.receiveMessages().flatMap(message -> {
System.out.println("Received message: " + message.getBody());
// Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
}),
receiver -> Mono.fromRunnable(() -> {
// Dispose of the receiver and sessionReceiver when done receiving messages.
receiver.close();
sessionReceiver.close();
}));
// This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
// operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
// receiving messages.
Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
}, error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
Resumen del método
Modificador y tipo | Método y descripción |
---|---|
Mono<Service |
acceptNextSession()
Adquiere un bloqueo de sesión para la siguiente sesión disponible y crea un ServiceBusReceiverAsyncClient para recibir mensajes de la sesión. |
Mono<Service |
acceptSession(String sessionId)
Adquiere un bloqueo de sesión para |
void | close() |
Métodos heredados de java.lang.Object
Detalles del método
acceptNextSession
public Mono
Adquiere un bloqueo de sesión para la siguiente sesión disponible y crea un ServiceBusReceiverAsyncClient para recibir mensajes de la sesión. Esperará hasta que una sesión esté disponible si no hay ninguna disponible inmediatamente.
Returns:
acceptSession
public Mono
Adquiere un bloqueo de sesión para sessionId
y crea un ServiceBusReceiverAsyncClient para recibir mensajes de la sesión. Si la sesión ya está bloqueada por otro cliente, se produce una AmqpException excepción .
Parameters:
Returns:
close
public void close()
Se aplica a
Azure SDK for Java