ServiceBusProcessorClient Class
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusProcessorClient
- com.
Implements
public final class ServiceBusProcessorClient
implements AutoCloseable
The processor client for processing Service Bus messages. ServiceBusProcessorClient provides a push-based mechanism that invokes the message processing callback when a message is received or the error handler when an error occurs when receiving messages. A ServiceBusProcessorClient can be created to process messages for a session-enabled or non session-enabled Service Bus entity. It supports auto-settlement of messages by default.
Sample code to instantiate a processor client and receive in PeekLock mode
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
// Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
// handling message reaches desired state such that it doesn't require Service Bus to redeliver
// the same message, then context.complete() should be called otherwise context.abandon().
final boolean success = Math.random() < 0.5;
if (success) {
try {
context.complete();
} catch (RuntimeException error) {
System.out.printf("Completion of the message %s failed.%n Error: %s%n",
message.getMessageId(), error);
}
} else {
try {
context.abandon();
} catch (RuntimeException error) {
System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
message.getMessageId(), error);
}
}
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
Sample code to instantiate a processor client and receive in ReceiveAndDelete mode
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
Create and run a session-enabled processor
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
Consumer<ServiceBusErrorContext> onError = context -> {
System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
context.getFullyQualifiedNamespace(), context.getEntityPath());
if (context.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) context.getException();
System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", context.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.sessionProcessor()
.queueName(sessionEnabledQueueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete()
.maxConcurrentSessions(2)
.processMessage(onMessage)
.processError(onError)
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
sessionProcessor.start();
// Stop processor and dispose when done processing messages.
sessionProcessor.stop();
sessionProcessor.close();
Method Summary
Modifier and Type | Method and Description |
---|---|
synchronized void |
close()
Stops message processing and closes the processor. |
synchronized String |
getIdentifier()
Gets the identifier of the instance of ServiceBusProcessorClient. |
String |
getQueueName()
Returns the queue name associated with this instance of ServiceBusProcessorClient. |
String |
getSubscriptionName()
Returns the subscription name associated with this instance of ServiceBusProcessorClient. |
String |
getTopicName()
Returns the topic name associated with this instance of ServiceBusProcessorClient. |
synchronized boolean |
isRunning()
Returns |
synchronized void |
start()
Starts the processor in the background. |
synchronized void |
stop()
Stops the message processing for this processor. |
Methods inherited from java.lang.Object
Method Details
close
public synchronized void close()
Stops message processing and closes the processor. The receiving links and sessions are closed and calling start() will create a new processing cycle with new links and new sessions.
getIdentifier
public synchronized String getIdentifier()
Gets the identifier of the instance of ServiceBusProcessorClient.
Returns:
getQueueName
public String getQueueName()
Returns the queue name associated with this instance of ServiceBusProcessorClient.
Returns:
null
if
the processor instance is for a topic and subscription.getSubscriptionName
public String getSubscriptionName()
Returns the subscription name associated with this instance of ServiceBusProcessorClient.
Returns:
null
if the processor instance is for a queue.getTopicName
public String getTopicName()
Returns the topic name associated with this instance of ServiceBusProcessorClient.
Returns:
null
if
the processor instance is for a queue.isRunning
public synchronized boolean isRunning()
Returns true
if the processor is running. If the processor is stopped or closed, this method returns false
.
Returns:
true
if the processor is running; false
otherwise.start
public synchronized void start()
Starts the processor in the background. When this method is called, the processor will initiate a message receiver that will invoke the message handler when new messages are available. This method is idempotent (ie. calling start()
again after the processor is already running is a no-op).
Calling start()
after calling stop() will resume processing messages using the same underlying connection.
Calling start()
after calling close() will start the processor with a new connection.
stop
public synchronized void stop()
Stops the message processing for this processor. The receiving links and sessions are kept active and this processor can resume processing messages by calling start() again.