ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder Klasse
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusClientBuilder. ServiceBusSessionProcessorClientBuilder
- com.
public final class ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder
Generator zum Erstellen ServiceBusProcessorClient von Nachrichten aus einer sitzungsbasierten Service Bus-Entität. ServiceBusProcessorClient verarbeitet Nachrichten und Fehler über processMessage(Consumer<ServiceBusReceivedMessageContext> processMessage) und processError(Consumer<ServiceBusErrorContext> processError). Wenn der Prozessor die Verarbeitung einer Sitzung beendet hat, versucht er, die nächste zu verarbeitende Sitzung abzurufen.
Standardmäßig ist der Prozessor wie folgt:
- Automatisches Abgleichen von Nachrichten. Deaktiviert über disableAutoComplete()
- Verarbeitet 1 Sitzung gleichzeitig. Konfiguriert über maxConcurrentSessions(int maxConcurrentSessions)
- Ruft 1 instance von aufprocessMessage(Consumer<ServiceBusReceivedMessageContext> processMessage). Konfiguriert über maxConcurrentCalls(int maxConcurrentCalls)
Instanziieren eines sitzungsfähigen Prozessorclients
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
Consumer<ServiceBusErrorContext> onError = context -> {
System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
context.getFullyQualifiedNamespace(), context.getEntityPath());
if (context.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) context.getException();
System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", context.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.sessionProcessor()
.queueName(sessionEnabledQueueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete()
.maxConcurrentSessions(2)
.processMessage(onMessage)
.processError(onError)
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
sessionProcessor.start();
// Stop processor and dispose when done processing messages.
sessionProcessor.stop();
sessionProcessor.close();
Methodenzusammenfassung
Modifizierer und Typ | Methode und Beschreibung |
---|---|
Service |
buildProcessorClient()
Erstellt einen sitzungsfähigen Service Bus-Prozessor , der für das Lesen ServiceBusReceivedMessage aus einer bestimmten Warteschlange oder einem bestimmten Abonnement zuständig ist. |
Service |
disableAutoComplete()
Deaktiviert die automatische Vervollständigung und automatische Verwerfen empfangener Nachrichten. |
Service |
maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)
Legt fest, wie lange die automatische Verlängerung der Sperre fortgesetzt werden soll. |
Service |
maxConcurrentCalls(int maxConcurrentCalls)
Max. gleichzeitige Nachrichten, die von diesem Prozessor verarbeitet werden sollen. |
Service |
maxConcurrentSessions(int maxConcurrentSessions)
Ermöglicht den Rollover für die Sitzungsverarbeitung, indem höchstens |
Service |
prefetchCount(int prefetchCount)
Legt die Anzahl der Vorabrufe des Prozessors fest. |
Service |
processError(Consumer<ServiceBusErrorContext> processError)
Der Fehlerhandler für den Prozessor, der bei einem Fehler beim Empfangen von Meldungen aufgerufen wird. |
Service |
processMessage(Consumer<ServiceBusReceivedMessageContext> processMessage)
Der Nachrichtenverarbeitungsrückruf für den Prozessor, der ausgeführt wird, wenn eine Nachricht empfangen wird. |
Service |
queueName(String queueName)
Legt den Namen der Warteschlange fest, für die ein Prozessor erstellt werden soll. |
Service |
receiveMode(ServiceBusReceiveMode receiveMode)
Legt den Empfangsmodus für den Prozessor fest. |
Service |
sessionIdleTimeout(Duration sessionIdleTimeout)
Legt fest, wie lange maximal auf den Empfang einer Nachricht für die aktuell aktive Sitzung gewartet werden soll. |
Service |
subQueue(SubQueue subQueue)
Legt den Typ des fest, mit dem eine SubQueue Verbindung hergestellt werden soll. |
Service |
subscriptionName(String subscriptionName)
Legt den Namen des Abonnements im Thema fest, auf das lauscht werden soll. |
Service |
topicName(String topicName)
Legt den Namen des Themas fest. |
Geerbte Methoden von java.lang.Object
Details zur Methode
buildProcessorClient
public ServiceBusProcessorClient buildProcessorClient()
Erstellt einen sitzungsfähigen Service Bus-Prozessor , der für das Lesen ServiceBusReceivedMessage aus einer bestimmten Warteschlange oder einem bestimmten Abonnement zuständig ist.
Returns:
disableAutoComplete
public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder disableAutoComplete()
Deaktiviert die automatische Vervollständigung und automatische Verwerfen empfangener Nachrichten. Standardmäßig lautet complete()eine erfolgreich verarbeitete Nachricht . Wenn bei der Verarbeitung der Nachricht ein Fehler auftritt, lautet dies abandon().
Returns:
maxAutoLockRenewDuration
public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)
Legt fest, wie lange die automatische Verlängerung der Sperre fortgesetzt werden soll. Durch Festlegen Duration#ZERO oder null
Deaktivieren der automatischen Verlängerung. Für RECEIVE_AND_DELETE den Modus ist die automatische Verlängerung deaktiviert.
Parameters:
null
gibt an, dass die automatische Verlängerung deaktiviert ist.
Returns:
maxConcurrentCalls
public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls)
Max. gleichzeitige Nachrichten, die von diesem Prozessor verarbeitet werden sollen.
Parameters:
Returns:
maxConcurrentSessions
public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConcurrentSessions)
Ermöglicht den Rollover für die Sitzungsverarbeitung, indem höchstens maxConcurrentSessions
verarbeitet wird.
Parameters:
Returns:
prefetchCount
public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder prefetchCount(int prefetchCount)
Legt die Anzahl der Vorabrufe des Prozessors fest. Sowohl für den Modus als auch PEEK_LOCKRECEIVE_AND_DELETE für den Modus ist der Standardwert 0. Der Vorabruf beschleunigt den Nachrichtenfluss, indem versucht wird, dass eine Nachricht jederzeit für den lokalen Abruf verfügbar ist, wenn und bevor die Anwendung den Prozessor startet. Wenn Sie einen Wert ungleich 0 (null) festlegen, wird diese Anzahl von Nachrichten vorab abgerufen. Wenn Sie den Wert auf 0 (null) festlegen, wird der Vorabruf deaktiviert. Bei verwendung eines Prefetchs von ungleich 0 (null) besteht das Risiko, dass Nachrichten verloren gehen, obwohl die Leistung besser ist.
Parameters:
Returns:
processError
public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder processError(Consumer
Der Fehlerhandler für den Prozessor, der bei einem Fehler beim Empfangen von Meldungen aufgerufen wird.
Parameters:
Returns:
processMessage
public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder processMessage(Consumer
Der Nachrichtenverarbeitungsrückruf für den Prozessor, der ausgeführt wird, wenn eine Nachricht empfangen wird.
Parameters:
Returns:
queueName
public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder queueName(String queueName)
Legt den Namen der Warteschlange fest, für die ein Prozessor erstellt werden soll.
Parameters:
Returns:
receiveMode
public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode)
Legt den Empfangsmodus für den Prozessor fest.
Parameters:
Returns:
sessionIdleTimeout
public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder sessionIdleTimeout(Duration sessionIdleTimeout)
Legt fest, wie lange maximal auf den Empfang einer Nachricht für die aktuell aktive Sitzung gewartet werden soll. Nach Ablauf dieser Zeit schließt der Prozessor die Sitzung, und versucht, eine weitere Sitzung zu verarbeiten.
Nachdem der Prozessor eine Nachricht an den processMessage(Consumer<ServiceBusReceivedMessageContext> processMessage) Handler übermittelt hat, kann der Prozessor die nächste Nachricht nicht mehr von der Sitzung empfangen, weil keine nächste Nachricht in der Sitzung vorhanden ist oder die verarbeitung der aktuellen Nachricht länger dauert als das sessionIdleTimeout
, dann tritt für die Sitzung ein Timeout auf. Um zu vermeiden, dass Sitzungen versehentlich verloren gehen, wählen Sie eine sessionIdleTimeout
aus, die größer als die Verarbeitungszeit einer Nachricht ist.
Wenn nicht angegeben, wird verwendet AmqpRetryOptions#getTryTimeout() .
Parameters:
Returns:
subQueue
public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder subQueue(SubQueue subQueue)
Legt den Typ des fest, mit dem SubQueue eine Verbindung hergestellt werden soll. Azure Service Bus Warteschlangen und Abonnements stellen eine sekundäre Unterwarteschlange bereit, die als Warteschlange für unzustellbare Nachrichten (Dead-Letter Queue, DLQ) bezeichnet wird.
Parameters:
Returns:
subscriptionName
public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder subscriptionName(String subscriptionName)
Legt den Namen des Abonnements im Thema fest, auf das lauscht werden soll. topicName(String topicName) muss ebenfalls festgelegt werden.
Parameters:
Returns:
topicName
public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder topicName(String topicName)
Legt den Namen des Themas fest. subscriptionName(String subscriptionName) muss ebenfalls festgelegt werden.
Parameters:
Returns:
Gilt für:
Azure SDK for Java