Compartilhar via


ServiceBusSessionReceiverAsyncClient Classe

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient

Implementações

public final class ServiceBusSessionReceiverAsyncClient
implements AutoCloseable

Esse cliente receptor de sessão assíncrona é usado para adquirir bloqueios de sessão de uma fila ou tópico e criar ServiceBusReceiverAsyncClient instâncias vinculadas às sessões bloqueadas. As sessões podem ser usadas como um primeiro processamento fifo (primeiro a entrar e sair) de mensagens. Filas e tópicos/assinaturas dão suporte a sessões do Barramento de Serviço, no entanto, ele deve ser habilitado no momento da criação da entidade.

Os exemplos mostrados neste documento usam um objeto de credencial chamado DefaultAzureCredential para autenticação, que é apropriado para a maioria dos cenários, incluindo ambientes locais de desenvolvimento e produção. Além disso, recomendamos usar a identidade gerenciada para autenticação em ambientes de produção. Você pode encontrar mais informações sobre diferentes maneiras de autenticação e seus tipos de credencial correspondentes na documentação da Identidade do Azure".

Exemplo: receber mensagens de uma sessão específica

Use acceptSession(String sessionId) para adquirir o bloqueio de uma sessão se você souber a ID da sessão e PEEK_LOCKdisableAutoComplete() for altamente recomendado para que os usuários tenham controle sobre a liquidação de mensagens.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
 // successfully locked.
 // `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
 // operations complete.
 // `Mono.usingWhen` can also be used if the resource closure returns a single item.
 Flux<Void> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptSession("<<my-session-id>>"),
     receiver -> {
         // Receive messages from <<my-session-id>> session.
         return receiver.receiveMessages().flatMap(message -> {
             System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
                 message.getBody());

             // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
             if (isMessageProcessed) {
                 return receiver.complete(message);
             } else {
                 return receiver.abandon(message);
             }
         });
     },
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of resources.
         receiver.close();
         sessionReceiver.close();
     }));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     unused -> {
     }, error -> System.err.print("Error receiving message from session: " + error),
     () -> System.out.println("Completed receiving from session."));

Exemplo: receber mensagens da primeira sessão disponível

Use acceptNextSession() para adquirir o bloqueio da próxima sessão disponível sem especificar a ID da sessão.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Creates a client to receive messages from the first available session. It waits until
 // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
 // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
 Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();

 Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
     receiver -> receiver.receiveMessages().flatMap(message -> {
         System.out.println("Received message: " + message.getBody());

         // Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return receiver.complete(message);
         } else {
             return receiver.abandon(message);
         }
     }),
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of the receiver and sessionReceiver when done receiving messages.
         receiver.close();
         sessionReceiver.close();
     }));

 // This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
 // operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
 // receiving messages.
 Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
 }, error -> System.out.println("Error occurred: " + error),
     () -> System.out.println("Receiving complete."));

Resumo do método

Modificador e tipo Método e descrição
Mono<ServiceBusReceiverAsyncClient> acceptNextSession()

Adquire um bloqueio de sessão para a próxima sessão disponível e cria um ServiceBusReceiverAsyncClient para receber mensagens da sessão.

Mono<ServiceBusReceiverAsyncClient> acceptSession(String sessionId)

Adquire um bloqueio de sessão para sessionId e crie um ServiceBusReceiverAsyncClient para receber mensagens da sessão.

void close()

Métodos herdados de java.lang.Object

Detalhes do método

acceptNextSession

public Mono acceptNextSession()

Adquire um bloqueio de sessão para a próxima sessão disponível e cria um ServiceBusReceiverAsyncClient para receber mensagens da sessão. Ele aguardará até que uma sessão esteja disponível se nenhuma estiver disponível imediatamente.

Returns:

Um ServiceBusReceiverAsyncClient que está vinculado à sessão disponível.

acceptSession

public Mono acceptSession(String sessionId)

Adquire um bloqueio de sessão para sessionId e crie um ServiceBusReceiverAsyncClient para receber mensagens da sessão. Se a sessão já estiver bloqueada por outro cliente, um AmqpException será gerado.

Parameters:

sessionId - A ID da sessão.

Returns:

Um ServiceBusReceiverAsyncClient que está vinculado à sessão especificada.

close

public void close()

Aplica-se a