Partager via


ServiceBusProcessorClient Classe

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusProcessorClient

Implémente

public final class ServiceBusProcessorClient
implements AutoCloseable

Client processeur pour le traitement des messages Service Bus. ServiceBusProcessorClient fournit un mécanisme basé sur l’envoi (push) qui appelle le rappel de traitement des messages lorsqu’un message est reçu ou le gestionnaire d’erreurs lorsqu’une erreur se produit lors de la réception de messages. Un ServiceBusProcessorClient peut être créé pour traiter les messages d’une entité Service Bus activée ou non pour la session. Il prend en charge le règlement automatique des messages par défaut.

Exemple de code pour instancier un client processeur et recevoir en mode PeekLock

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
     // handling message reaches desired state such that it doesn't require Service Bus to redeliver
     // the same message, then context.complete() should be called otherwise context.abandon().
     final boolean success = Math.random() < 0.5;
     if (success) {
         try {
             context.complete();
         } catch (RuntimeException error) {
             System.out.printf("Completion of the message %s failed.%n Error: %s%n",
                 message.getMessageId(), error);
         }
     } else {
         try {
             context.abandon();
         } catch (RuntimeException error) {
             System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
                 message.getMessageId(), error);
         }
     }
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()  // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

Exemple de code pour instancier un client de processeur et recevoir en mode ReceiveAndDelete

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

Créer et exécuter un processeur activé pour la session

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
     ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 Consumer<ServiceBusErrorContext> onError = context -> {
     System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
         context.getFullyQualifiedNamespace(), context.getEntityPath());

     if (context.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) context.getException();

         System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", context.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .sessionProcessor()
     .queueName(sessionEnabledQueueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()
     .maxConcurrentSessions(2)
     .processMessage(onMessage)
     .processError(onError)
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 sessionProcessor.start();

 // Stop processor and dispose when done processing messages.
 sessionProcessor.stop();
 sessionProcessor.close();

Résumé de la méthode

Modificateur et type Méthode et description
synchronized void close()

Arrête le traitement des messages et ferme le processeur.

synchronized String getIdentifier()

Obtient l’identificateur du instance de ServiceBusProcessorClient.

String getQueueName()

Retourne le nom de file d’attente associé à cette instance de ServiceBusProcessorClient.

String getSubscriptionName()

Retourne le nom d’abonnement associé à cette instance de ServiceBusProcessorClient.

String getTopicName()

Retourne le nom de la rubrique associé à cette instance de ServiceBusProcessorClient.

synchronized boolean isRunning()

Retourne true si le processeur est en cours d’exécution.

synchronized void start()

Démarre le processeur en arrière-plan.

synchronized void stop()

Arrête le traitement des messages pour ce processeur.

Méthodes héritées de java.lang.Object

Détails de la méthode

close

public synchronized void close()

Arrête le traitement des messages et ferme le processeur. Les liens et sessions de réception sont fermés et les appels start() créent un nouveau cycle de traitement avec de nouveaux liens et de nouvelles sessions.

getIdentifier

public synchronized String getIdentifier()

Obtient l’identificateur du instance de ServiceBusProcessorClient.

Returns:

Identificateur qui peut identifier le instance de ServiceBusProcessorClient.

getQueueName

public String getQueueName()

Retourne le nom de file d’attente associé à cette instance de ServiceBusProcessorClient.

Returns:

nom de file d’attente associé à cette instance de ServiceBusProcessorClient ou null si le instance processeur est destiné à une rubrique et un abonnement.

getSubscriptionName

public String getSubscriptionName()

Retourne le nom d’abonnement associé à cette instance de ServiceBusProcessorClient.

Returns:

nom d’abonnement associé à cette instance de ServiceBusProcessorClient ou null si le instance processeur est destiné à une file d’attente.

getTopicName

public String getTopicName()

Retourne le nom de la rubrique associé à cette instance de ServiceBusProcessorClient.

Returns:

le nom de la rubrique associé à cette instance de ServiceBusProcessorClient ou null si le instance processeur est destiné à une file d’attente.

isRunning

public synchronized boolean isRunning()

Retourne true si le processeur est en cours d’exécution. Si le processeur est arrêté ou fermé, cette méthode retourne false.

Returns:

true si le processeur est en cours d’exécution ; false Sinon.

start

public synchronized void start()

Démarre le processeur en arrière-plan. Lorsque cette méthode est appelée, le processeur lance un récepteur de messages qui appelle le gestionnaire de messages lorsque de nouveaux messages sont disponibles. Cette méthode est idempotente (c’est-à-dire qu’appeler start() à nouveau une fois que le processeur est déjà en cours d’exécution est une opération sans opération).

L’appel start() après l’appel stop() reprend le traitement des messages à l’aide de la même connexion sous-jacente.

L’appel start() après l’appel close() démarre le processeur avec une nouvelle connexion.

stop

public synchronized void stop()

Arrête le traitement des messages pour ce processeur. Les liens et les sessions de réception sont maintenus actifs et ce processeur peut reprendre le traitement des messages en appelant start() à nouveau.

S’applique à