Freigeben über


ServiceBusProcessorClient Klasse

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusProcessorClient

Implementiert

public final class ServiceBusProcessorClient
implements AutoCloseable

Der Prozessorclient für die Verarbeitung von Service Bus-Nachrichten. ServiceBusProcessorClient stellt einen pushbasierten Mechanismus bereit, der den Nachrichtenverarbeitungsrückruf beim Empfang einer Nachricht oder den Fehlerhandler aufruft, wenn beim Empfangen von Nachrichten ein Fehler auftritt. Ein ServiceBusProcessorClient kann erstellt werden, um Nachrichten für eine sitzungsfähige oder nicht sitzungsfähige Service Bus-Entität zu verarbeiten. Standardmäßig wird die automatische Abrechnung von Nachrichten unterstützt.

Beispielcode zum Instanziieren eines Prozessorclients und Empfangen im PeekLock-Modus

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
     // handling message reaches desired state such that it doesn't require Service Bus to redeliver
     // the same message, then context.complete() should be called otherwise context.abandon().
     final boolean success = Math.random() < 0.5;
     if (success) {
         try {
             context.complete();
         } catch (RuntimeException error) {
             System.out.printf("Completion of the message %s failed.%n Error: %s%n",
                 message.getMessageId(), error);
         }
     } else {
         try {
             context.abandon();
         } catch (RuntimeException error) {
             System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
                 message.getMessageId(), error);
         }
     }
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()  // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

Beispielcode zum Instanziieren eines Prozessorclients und empfangen im ReceiveAndDelete-Modus

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

Erstellen und Ausführen eines sitzungsfähigen Prozessors

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
     ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 Consumer<ServiceBusErrorContext> onError = context -> {
     System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
         context.getFullyQualifiedNamespace(), context.getEntityPath());

     if (context.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) context.getException();

         System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", context.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .sessionProcessor()
     .queueName(sessionEnabledQueueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()
     .maxConcurrentSessions(2)
     .processMessage(onMessage)
     .processError(onError)
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 sessionProcessor.start();

 // Stop processor and dispose when done processing messages.
 sessionProcessor.stop();
 sessionProcessor.close();

Methodenzusammenfassung

Modifizierer und Typ Methode und Beschreibung
synchronized void close()

Beendet die Nachrichtenverarbeitung und schließt den Prozessor.

synchronized String getIdentifier()

Ruft den Bezeichner des instance von abServiceBusProcessorClient.

String getQueueName()

Gibt den Warteschlangennamen zurück, der diesem instance von ServiceBusProcessorClientzugeordnet ist.

String getSubscriptionName()

Gibt den Abonnementnamen zurück, der diesem instance von ServiceBusProcessorClientzugeordnet ist.

String getTopicName()

Gibt den Themennamen zurück, der diesem instance von ServiceBusProcessorClientzugeordnet ist.

synchronized boolean isRunning()

Gibt zurück true , wenn der Prozessor ausgeführt wird.

synchronized void start()

Startet den Prozessor im Hintergrund.

synchronized void stop()

Beendet die Nachrichtenverarbeitung für diesen Prozessor.

Geerbte Methoden von java.lang.Object

Details zur Methode

close

public synchronized void close()

Beendet die Nachrichtenverarbeitung und schließt den Prozessor. Die empfangenden Links und Sitzungen werden geschlossen, und beim Aufrufen start() wird ein neuer Verarbeitungszyklus mit neuen Links und neuen Sitzungen erstellt.

getIdentifier

public synchronized String getIdentifier()

Ruft den Bezeichner des instance von abServiceBusProcessorClient.

Returns:

Der Bezeichner, der die instance von ServiceBusProcessorClientidentifizieren kann.

getQueueName

public String getQueueName()

Gibt den Warteschlangennamen zurück, der diesem instance von ServiceBusProcessorClientzugeordnet ist.

Returns:

der diesem instance zugeordnete Warteschlangenname von ServiceBusProcessorClient odernull, wenn der Prozessor instance für ein Thema und ein Abonnement gilt.

getSubscriptionName

public String getSubscriptionName()

Gibt den Abonnementnamen zurück, der diesem instance von ServiceBusProcessorClientzugeordnet ist.

Returns:

der Abonnementname, der diesem instance von ServiceBusProcessorClient zugeordnet ist, oder null , wenn der Prozessor instance für eine Warteschlange ist.

getTopicName

public String getTopicName()

Gibt den Themennamen zurück, der diesem instance von ServiceBusProcessorClientzugeordnet ist.

Returns:

der Diesem instance zugeordnete Themenname von ServiceBusProcessorClient odernull, wenn der Prozessor instance für eine Warteschlange ist.

isRunning

public synchronized boolean isRunning()

Gibt zurück true , wenn der Prozessor ausgeführt wird. Wenn der Prozessor beendet oder geschlossen wird, gibt diese Methode zurück false.

Returns:

true , wenn der Prozessor ausgeführt wird; false Andernfalls.

start

public synchronized void start()

Startet den Prozessor im Hintergrund. Wenn diese Methode aufgerufen wird, initiiert der Prozessor einen Nachrichtenempfänger, der den Nachrichtenhandler aufruft, wenn neue Nachrichten verfügbar sind. Diese Methode ist idempotent (dh das erneute Aufrufen start() , nachdem der Prozessor bereits ausgeführt wird, ist ein no-op).

Beim Aufrufen start() nach dem Aufruf stop() wird die Verarbeitung von Nachrichten mit derselben zugrunde liegenden Verbindung fortgesetzt.

Beim Aufrufen start() nach dem Aufruf close() wird der Prozessor mit einer neuen Verbindung gestartet.

stop

public synchronized void stop()

Beendet die Nachrichtenverarbeitung für diesen Prozessor. Die empfangenden Links und Sitzungen werden aktiv gehalten, und dieser Prozessor kann die Verarbeitung von Nachrichten fortsetzen, indem er erneut aufruft start() .

Gilt für: