Compartir a través de


ServiceBusProcessorClient Clase

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusProcessorClient

Implementaciones

public final class ServiceBusProcessorClient
implements AutoCloseable

Cliente del procesador para procesar mensajes de Service Bus. ServiceBusProcessorClient proporciona un mecanismo basado en inserción que invoca la devolución de llamada de procesamiento de mensajes cuando se recibe un mensaje o el controlador de errores cuando se produce un error al recibir mensajes. ServiceBusProcessorClient Se puede crear para procesar mensajes de una entidad de Service Bus habilitada para sesión o no habilitada para la sesión. Admite la liquidación automática de mensajes de forma predeterminada.

Código de ejemplo para crear instancias de un cliente de procesador y recibirlo en modo PeekLock

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
     // handling message reaches desired state such that it doesn't require Service Bus to redeliver
     // the same message, then context.complete() should be called otherwise context.abandon().
     final boolean success = Math.random() < 0.5;
     if (success) {
         try {
             context.complete();
         } catch (RuntimeException error) {
             System.out.printf("Completion of the message %s failed.%n Error: %s%n",
                 message.getMessageId(), error);
         }
     } else {
         try {
             context.abandon();
         } catch (RuntimeException error) {
             System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
                 message.getMessageId(), error);
         }
     }
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()  // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

Código de ejemplo para crear una instancia de un cliente de procesador y recibir en el modo ReceiveAndDelete

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

Creación y ejecución de un procesador habilitado para sesión

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
     ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 Consumer<ServiceBusErrorContext> onError = context -> {
     System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
         context.getFullyQualifiedNamespace(), context.getEntityPath());

     if (context.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) context.getException();

         System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", context.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .sessionProcessor()
     .queueName(sessionEnabledQueueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()
     .maxConcurrentSessions(2)
     .processMessage(onMessage)
     .processError(onError)
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 sessionProcessor.start();

 // Stop processor and dispose when done processing messages.
 sessionProcessor.stop();
 sessionProcessor.close();

Resumen del método

Modificador y tipo Método y descripción
synchronized void close()

Detiene el procesamiento de mensajes y cierra el procesador.

synchronized String getIdentifier()

Obtiene el identificador de la instancia de ServiceBusProcessorClient.

String getQueueName()

Devuelve el nombre de cola asociado a esta instancia de ServiceBusProcessorClient.

String getSubscriptionName()

Devuelve el nombre de la suscripción asociado a esta instancia de ServiceBusProcessorClient.

String getTopicName()

Devuelve el nombre del tema asociado a esta instancia de ServiceBusProcessorClient.

synchronized boolean isRunning()

Devuelve true si el procesador se está ejecutando.

synchronized void start()

Inicia el procesador en segundo plano.

synchronized void stop()

Detiene el procesamiento de mensajes para este procesador.

Métodos heredados de java.lang.Object

Detalles del método

close

public synchronized void close()

Detiene el procesamiento de mensajes y cierra el procesador. Los vínculos y sesiones receptores se cierran y las llamadas start() crearán un nuevo ciclo de procesamiento con nuevos vínculos y sesiones nuevas.

getIdentifier

public synchronized String getIdentifier()

Obtiene el identificador de la instancia de ServiceBusProcessorClient.

Returns:

Identificador que puede identificar la instancia de ServiceBusProcessorClient.

getQueueName

public String getQueueName()

Devuelve el nombre de cola asociado a esta instancia de ServiceBusProcessorClient.

Returns:

el nombre de la cola asociado a esta instancia de o null si la instancia del ServiceBusProcessorClient procesador es para un tema y una suscripción.

getSubscriptionName

public String getSubscriptionName()

Devuelve el nombre de la suscripción asociado a esta instancia de ServiceBusProcessorClient.

Returns:

el nombre de la suscripción asociado a esta instancia de ServiceBusProcessorClient o null si la instancia del procesador es para una cola.

getTopicName

public String getTopicName()

Devuelve el nombre del tema asociado a esta instancia de ServiceBusProcessorClient.

Returns:

el nombre del tema asociado a esta instancia de o null si la instancia del ServiceBusProcessorClient procesador es para una cola.

isRunning

public synchronized boolean isRunning()

Devuelve true si el procesador se está ejecutando. Si el procesador se detiene o se cierra, este método devuelve false.

Returns:

true si el procesador se está ejecutando; false Lo contrario.

start

public synchronized void start()

Inicia el procesador en segundo plano. Cuando se llama a este método, el procesador iniciará un receptor de mensajes que invocará al controlador de mensajes cuando haya nuevos mensajes disponibles. Este método es idempotente (es decir, llamar start() de nuevo después de que el procesador ya se esté ejecutando es una operación sin operación).

Una llamada después de start() llamar stop() reanudará el procesamiento de mensajes mediante la misma conexión subyacente.

Una llamada start() después de llamar close() iniciará el procesador con una nueva conexión.

stop

public synchronized void stop()

Detiene el procesamiento de mensajes para este procesador. Los vínculos y sesiones receptores se mantienen activos y este procesador puede reanudar el procesamiento de mensajes llamando start() de nuevo.

Se aplica a