ServiceBusProcessorClient Klasse
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusProcessorClient
- com.
Implementiert
public final class ServiceBusProcessorClient
implements AutoCloseable
Der Prozessorclient für die Verarbeitung von Service Bus-Nachrichten. ServiceBusProcessorClient stellt einen pushbasierten Mechanismus bereit, der den Nachrichtenverarbeitungsrückruf beim Empfang einer Nachricht oder den Fehlerhandler aufruft, wenn beim Empfangen von Nachrichten ein Fehler auftritt. Ein ServiceBusProcessorClient kann erstellt werden, um Nachrichten für eine sitzungsfähige oder nicht sitzungsfähige Service Bus-Entität zu verarbeiten. Standardmäßig wird die automatische Abrechnung von Nachrichten unterstützt.
Beispielcode zum Instanziieren eines Prozessorclients und Empfangen im PeekLock-Modus
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
// Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
// handling message reaches desired state such that it doesn't require Service Bus to redeliver
// the same message, then context.complete() should be called otherwise context.abandon().
final boolean success = Math.random() < 0.5;
if (success) {
try {
context.complete();
} catch (RuntimeException error) {
System.out.printf("Completion of the message %s failed.%n Error: %s%n",
message.getMessageId(), error);
}
} else {
try {
context.abandon();
} catch (RuntimeException error) {
System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
message.getMessageId(), error);
}
}
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
Beispielcode zum Instanziieren eines Prozessorclients und empfangen im ReceiveAndDelete-Modus
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
Erstellen und Ausführen eines sitzungsfähigen Prozessors
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
Consumer<ServiceBusErrorContext> onError = context -> {
System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
context.getFullyQualifiedNamespace(), context.getEntityPath());
if (context.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) context.getException();
System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", context.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.sessionProcessor()
.queueName(sessionEnabledQueueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete()
.maxConcurrentSessions(2)
.processMessage(onMessage)
.processError(onError)
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
sessionProcessor.start();
// Stop processor and dispose when done processing messages.
sessionProcessor.stop();
sessionProcessor.close();
Methodenzusammenfassung
Modifizierer und Typ | Methode und Beschreibung |
---|---|
synchronized void |
close()
Beendet die Nachrichtenverarbeitung und schließt den Prozessor. |
synchronized String |
getIdentifier()
Ruft den Bezeichner des instance von abServiceBusProcessorClient. |
String |
getQueueName()
Gibt den Warteschlangennamen zurück, der diesem instance von ServiceBusProcessorClientzugeordnet ist. |
String |
getSubscriptionName()
Gibt den Abonnementnamen zurück, der diesem instance von ServiceBusProcessorClientzugeordnet ist. |
String |
getTopicName()
Gibt den Themennamen zurück, der diesem instance von ServiceBusProcessorClientzugeordnet ist. |
synchronized boolean |
isRunning()
Gibt zurück |
synchronized void |
start()
Startet den Prozessor im Hintergrund. |
synchronized void |
stop()
Beendet die Nachrichtenverarbeitung für diesen Prozessor. |
Geerbte Methoden von java.lang.Object
Details zur Methode
close
public synchronized void close()
Beendet die Nachrichtenverarbeitung und schließt den Prozessor. Die empfangenden Links und Sitzungen werden geschlossen, und beim Aufrufen start() wird ein neuer Verarbeitungszyklus mit neuen Links und neuen Sitzungen erstellt.
getIdentifier
public synchronized String getIdentifier()
Ruft den Bezeichner des instance von abServiceBusProcessorClient.
Returns:
getQueueName
public String getQueueName()
Gibt den Warteschlangennamen zurück, der diesem instance von ServiceBusProcessorClientzugeordnet ist.
Returns:
null
, wenn der Prozessor instance für ein Thema und ein Abonnement gilt.getSubscriptionName
public String getSubscriptionName()
Gibt den Abonnementnamen zurück, der diesem instance von ServiceBusProcessorClientzugeordnet ist.
Returns:
null
, wenn der Prozessor instance für eine Warteschlange ist.getTopicName
public String getTopicName()
Gibt den Themennamen zurück, der diesem instance von ServiceBusProcessorClientzugeordnet ist.
Returns:
null
, wenn der Prozessor instance für eine Warteschlange ist.isRunning
public synchronized boolean isRunning()
Gibt zurück true
, wenn der Prozessor ausgeführt wird. Wenn der Prozessor beendet oder geschlossen wird, gibt diese Methode zurück false
.
Returns:
true
, wenn der Prozessor ausgeführt wird; false
Andernfalls.start
public synchronized void start()
Startet den Prozessor im Hintergrund. Wenn diese Methode aufgerufen wird, initiiert der Prozessor einen Nachrichtenempfänger, der den Nachrichtenhandler aufruft, wenn neue Nachrichten verfügbar sind. Diese Methode ist idempotent (dh das erneute Aufrufen start()
, nachdem der Prozessor bereits ausgeführt wird, ist ein no-op).
Beim Aufrufen start()
nach dem Aufruf stop() wird die Verarbeitung von Nachrichten mit derselben zugrunde liegenden Verbindung fortgesetzt.
Beim Aufrufen start()
nach dem Aufruf close() wird der Prozessor mit einer neuen Verbindung gestartet.
stop
public synchronized void stop()
Beendet die Nachrichtenverarbeitung für diesen Prozessor. Die empfangenden Links und Sitzungen werden aktiv gehalten, und dieser Prozessor kann die Verarbeitung von Nachrichten fortsetzen, indem er erneut aufruft start() .
Gilt für:
Azure SDK for Java