ServiceBusProcessorClient Clase
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusProcessorClient
- com.
Implementaciones
public final class ServiceBusProcessorClient
implements AutoCloseable
Cliente del procesador para procesar mensajes de Service Bus. ServiceBusProcessorClient proporciona un mecanismo basado en inserción que invoca la devolución de llamada de procesamiento de mensajes cuando se recibe un mensaje o el controlador de errores cuando se produce un error al recibir mensajes. ServiceBusProcessorClient Se puede crear para procesar mensajes de una entidad de Service Bus habilitada para sesión o no habilitada para la sesión. Admite la liquidación automática de mensajes de forma predeterminada.
Código de ejemplo para crear instancias de un cliente de procesador y recibirlo en modo PeekLock
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
// Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
// handling message reaches desired state such that it doesn't require Service Bus to redeliver
// the same message, then context.complete() should be called otherwise context.abandon().
final boolean success = Math.random() < 0.5;
if (success) {
try {
context.complete();
} catch (RuntimeException error) {
System.out.printf("Completion of the message %s failed.%n Error: %s%n",
message.getMessageId(), error);
}
} else {
try {
context.abandon();
} catch (RuntimeException error) {
System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
message.getMessageId(), error);
}
}
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
Código de ejemplo para crear una instancia de un cliente de procesador y recibir en el modo ReceiveAndDelete
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
Creación y ejecución de un procesador habilitado para sesión
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
Consumer<ServiceBusErrorContext> onError = context -> {
System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
context.getFullyQualifiedNamespace(), context.getEntityPath());
if (context.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) context.getException();
System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", context.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.sessionProcessor()
.queueName(sessionEnabledQueueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete()
.maxConcurrentSessions(2)
.processMessage(onMessage)
.processError(onError)
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
sessionProcessor.start();
// Stop processor and dispose when done processing messages.
sessionProcessor.stop();
sessionProcessor.close();
Resumen del método
Modificador y tipo | Método y descripción |
---|---|
synchronized void |
close()
Detiene el procesamiento de mensajes y cierra el procesador. |
synchronized String |
getIdentifier()
Obtiene el identificador de la instancia de ServiceBusProcessorClient. |
String |
getQueueName()
Devuelve el nombre de cola asociado a esta instancia de ServiceBusProcessorClient. |
String |
getSubscriptionName()
Devuelve el nombre de la suscripción asociado a esta instancia de ServiceBusProcessorClient. |
String |
getTopicName()
Devuelve el nombre del tema asociado a esta instancia de ServiceBusProcessorClient. |
synchronized boolean |
isRunning()
Devuelve |
synchronized void |
start()
Inicia el procesador en segundo plano. |
synchronized void |
stop()
Detiene el procesamiento de mensajes para este procesador. |
Métodos heredados de java.lang.Object
Detalles del método
close
public synchronized void close()
Detiene el procesamiento de mensajes y cierra el procesador. Los vínculos y sesiones receptores se cierran y las llamadas start() crearán un nuevo ciclo de procesamiento con nuevos vínculos y sesiones nuevas.
getIdentifier
public synchronized String getIdentifier()
Obtiene el identificador de la instancia de ServiceBusProcessorClient.
Returns:
getQueueName
public String getQueueName()
Devuelve el nombre de cola asociado a esta instancia de ServiceBusProcessorClient.
Returns:
null
si la instancia del ServiceBusProcessorClient procesador es para un tema y una suscripción.getSubscriptionName
public String getSubscriptionName()
Devuelve el nombre de la suscripción asociado a esta instancia de ServiceBusProcessorClient.
Returns:
null
si la instancia del procesador es para una cola.getTopicName
public String getTopicName()
Devuelve el nombre del tema asociado a esta instancia de ServiceBusProcessorClient.
Returns:
null
si la instancia del ServiceBusProcessorClient procesador es para una cola.isRunning
public synchronized boolean isRunning()
Devuelve true
si el procesador se está ejecutando. Si el procesador se detiene o se cierra, este método devuelve false
.
Returns:
true
si el procesador se está ejecutando; false
Lo contrario.start
public synchronized void start()
Inicia el procesador en segundo plano. Cuando se llama a este método, el procesador iniciará un receptor de mensajes que invocará al controlador de mensajes cuando haya nuevos mensajes disponibles. Este método es idempotente (es decir, llamar start()
de nuevo después de que el procesador ya se esté ejecutando es una operación sin operación).
Una llamada después de start()
llamar stop() reanudará el procesamiento de mensajes mediante la misma conexión subyacente.
Una llamada start()
después de llamar close() iniciará el procesador con una nueva conexión.
stop
public synchronized void stop()
Detiene el procesamiento de mensajes para este procesador. Los vínculos y sesiones receptores se mantienen activos y este procesador puede reanudar el procesamiento de mensajes llamando start() de nuevo.
Se aplica a
Azure SDK for Java