Partager via


ServiceBusSessionReceiverAsyncClient Classe

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient

Implémente

public final class ServiceBusSessionReceiverAsyncClient
implements AutoCloseable

Ce client de récepteur de session asynchrone est utilisé pour acquérir des verrous de session à partir d’une file d’attente ou d’une rubrique et créer ServiceBusReceiverAsyncClient des instances liées aux sessions verrouillées. Les sessions peuvent être utilisées comme traitement FIFO (premier entré, premier sorti) des messages. Les files d’attente et les rubriques/abonnements prennent en charge les sessions Service Bus. Toutefois, il doit être activé au moment de la création de l’entité.

Les exemples présentés dans ce document utilisent un objet d’informations d’identification nommé DefaultAzureCredential pour l’authentification, qui convient à la plupart des scénarios, notamment aux environnements de développement et de production locaux. En outre, nous vous recommandons d’utiliser une identité managée pour l’authentification dans les environnements de production. Vous trouverez plus d’informations sur les différentes méthodes d’authentification et leurs types d’informations d’identification correspondants dans la documentation Azure Identity.

Exemple : recevoir des messages d’une session spécifique

Utilisez acceptSession(String sessionId) pour acquérir le verrou d’une session si vous connaissez l’ID de session. PEEK_LOCK Il disableAutoComplete() est vivement recommandé aux utilisateurs de contrôler le règlement des messages.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
 // successfully locked.
 // `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
 // operations complete.
 // `Mono.usingWhen` can also be used if the resource closure returns a single item.
 Flux<Void> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptSession("<<my-session-id>>"),
     receiver -> {
         // Receive messages from <<my-session-id>> session.
         return receiver.receiveMessages().flatMap(message -> {
             System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
                 message.getBody());

             // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
             if (isMessageProcessed) {
                 return receiver.complete(message);
             } else {
                 return receiver.abandon(message);
             }
         });
     },
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of resources.
         receiver.close();
         sessionReceiver.close();
     }));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     unused -> {
     }, error -> System.err.print("Error receiving message from session: " + error),
     () -> System.out.println("Completed receiving from session."));

Exemple : Recevoir des messages de la première session disponible

Utilisez acceptNextSession() pour acquérir le verrou de la session disponible suivante sans spécifier l’ID de session.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Creates a client to receive messages from the first available session. It waits until
 // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
 // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
 Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();

 Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
     receiver -> receiver.receiveMessages().flatMap(message -> {
         System.out.println("Received message: " + message.getBody());

         // Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return receiver.complete(message);
         } else {
             return receiver.abandon(message);
         }
     }),
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of the receiver and sessionReceiver when done receiving messages.
         receiver.close();
         sessionReceiver.close();
     }));

 // This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
 // operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
 // receiving messages.
 Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
 }, error -> System.out.println("Error occurred: " + error),
     () -> System.out.println("Receiving complete."));

Résumé de la méthode

Modificateur et type Méthode et description
Mono<ServiceBusReceiverAsyncClient> acceptNextSession()

Acquiert un verrou de session pour la session disponible suivante et crée un ServiceBusReceiverAsyncClient pour recevoir des messages de la session.

Mono<ServiceBusReceiverAsyncClient> acceptSession(String sessionId)

Acquiert un verrou de session pour et crée un ServiceBusReceiverAsyncClient pour sessionId recevoir des messages de la session.

void close()

Méthodes héritées de java.lang.Object

Détails de la méthode

acceptNextSession

public Mono acceptNextSession()

Acquiert un verrou de session pour la session disponible suivante et crée un ServiceBusReceiverAsyncClient pour recevoir des messages de la session. Il attend qu’une session soit disponible si aucune n’est immédiatement disponible.

Returns:

ServiceBusReceiverAsyncClient qui est lié à la session disponible.

acceptSession

public Mono acceptSession(String sessionId)

Acquiert un verrou de session pour et crée un ServiceBusReceiverAsyncClient pour sessionId recevoir des messages de la session. Si la session est déjà verrouillée par un autre client, une AmqpException est levée.

Parameters:

sessionId - ID de session.

Returns:

ServiceBusReceiverAsyncClient qui est lié à la session spécifiée.

close

public void close()

S’applique à