Compartilhar via


ServiceBusProcessorClient Classe

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusProcessorClient

Implementações

public final class ServiceBusProcessorClient
implements AutoCloseable

O cliente do processador para processar mensagens do Barramento de Serviço. ServiceBusProcessorClient fornece um mecanismo baseado em push que invoca o retorno de chamada de processamento de mensagens quando uma mensagem é recebida ou o manipulador de erros quando ocorre um erro ao receber mensagens. Um ServiceBusProcessorClient pode ser criado para processar mensagens para uma entidade do Barramento de Serviço habilitada para sessão ou não habilitada para sessão. Ele dá suporte à liquidação automática de mensagens por padrão.

Código de exemplo para instanciar um cliente processador e receber no modo PeekLock

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
     // handling message reaches desired state such that it doesn't require Service Bus to redeliver
     // the same message, then context.complete() should be called otherwise context.abandon().
     final boolean success = Math.random() < 0.5;
     if (success) {
         try {
             context.complete();
         } catch (RuntimeException error) {
             System.out.printf("Completion of the message %s failed.%n Error: %s%n",
                 message.getMessageId(), error);
         }
     } else {
         try {
             context.abandon();
         } catch (RuntimeException error) {
             System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
                 message.getMessageId(), error);
         }
     }
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()  // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

Código de exemplo para instanciar um cliente processador e receber no modo ReceiveAndDelete

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

Criar e executar um processador habilitado para sessão

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
     ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 Consumer<ServiceBusErrorContext> onError = context -> {
     System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
         context.getFullyQualifiedNamespace(), context.getEntityPath());

     if (context.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) context.getException();

         System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", context.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .sessionProcessor()
     .queueName(sessionEnabledQueueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()
     .maxConcurrentSessions(2)
     .processMessage(onMessage)
     .processError(onError)
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 sessionProcessor.start();

 // Stop processor and dispose when done processing messages.
 sessionProcessor.stop();
 sessionProcessor.close();

Resumo do método

Modificador e tipo Método e descrição
synchronized void close()

Interrompe o processamento de mensagens e fecha o processador.

synchronized String getIdentifier()

Obtém o identificador da instância do ServiceBusProcessorClient.

String getQueueName()

Retorna o nome da fila associado a essa instância do ServiceBusProcessorClient.

String getSubscriptionName()

Retorna o nome da assinatura associado a esta instância do ServiceBusProcessorClient.

String getTopicName()

Retorna o nome do tópico associado a esta instância do ServiceBusProcessorClient.

synchronized boolean isRunning()

Retornará true se o processador estiver em execução.

synchronized void start()

Inicia o processador em segundo plano.

synchronized void stop()

Interrompe o processamento de mensagens para esse processador.

Métodos herdados de java.lang.Object

Detalhes do método

close

public synchronized void close()

Interrompe o processamento de mensagens e fecha o processador. Os links e sessões de recebimento são fechados e a chamada start() criará um novo ciclo de processamento com novos links e novas sessões.

getIdentifier

public synchronized String getIdentifier()

Obtém o identificador da instância do ServiceBusProcessorClient.

Returns:

O identificador que pode identificar a instância do ServiceBusProcessorClient.

getQueueName

public String getQueueName()

Retorna o nome da fila associado a essa instância do ServiceBusProcessorClient.

Returns:

o nome da fila associado a essa instância do ServiceBusProcessorClient ou null se a instância do processador for para um tópico e uma assinatura.

getSubscriptionName

public String getSubscriptionName()

Retorna o nome da assinatura associado a esta instância do ServiceBusProcessorClient.

Returns:

o nome da assinatura associado a essa instância do ServiceBusProcessorClient ou null se a instância do processador for para uma fila.

getTopicName

public String getTopicName()

Retorna o nome do tópico associado a esta instância do ServiceBusProcessorClient.

Returns:

o nome do tópico associado a esta instância do ServiceBusProcessorClient ou null se a instância do processador for para uma fila.

isRunning

public synchronized boolean isRunning()

Retornará true se o processador estiver em execução. Se o processador for interrompido ou fechado, esse método retornará false.

Returns:

true se o processador estiver em execução; false Caso contrário.

start

public synchronized void start()

Inicia o processador em segundo plano. Quando esse método for chamado, o processador iniciará um receptor de mensagem que invocará o manipulador de mensagens quando novas mensagens estiverem disponíveis. Esse método é idempotente (ou seja, chamar start() novamente depois que o processador já estiver em execução é uma operação sem operações).

Chamar start() depois de chamar stop() retomará o processamento de mensagens usando a mesma conexão subjacente.

Chamar start() depois de chamar close() iniciará o processador com uma nova conexão.

stop

public synchronized void stop()

Interrompe o processamento de mensagens para esse processador. Os links de recebimento e as sessões são mantidos ativos e esse processador pode retomar o processamento de mensagens chamando start() novamente.

Aplica-se a