Compartilhar via


EventProcessorClient Classe

  • java.lang.Object
    • com.azure.messaging.eventhubs.EventProcessorClient

public class EventProcessorClient

EventProcessorClient fornece um mecanismo conveniente para consumir eventos de todas as partições de um Hub de Eventos no contexto de um grupo de consumidores. O aplicativo baseado em Processador de Eventos consiste em uma ou mais instâncias de EventProcessorClient(s) que são configuradas para consumir eventos do mesmo Hub de Eventos, grupo de consumidores para equilibrar a carga de trabalho em instâncias diferentes e acompanhar o progresso quando os eventos são processados. Com base no número de instâncias em execução, cada EventProcessorClient pode ter zero ou mais partições para equilibrar a carga de trabalho entre todas as instâncias.

Exemplo: Construir um EventProcessorClient

O exemplo a seguir usa um azure-messaging-eventhubs-checkpointstore-blob na memória CheckpointStore fornece um repositório de ponto de verificação apoiado por Armazenamento de Blobs do Azure. Além disso, fullyQualifiedNamespace é o nome do host do Namespace dos Hubs de Eventos. Ele é listado no painel "Essentials" depois de navegar até o Namespace dos Hubs de Eventos por meio do Portal do Azure. A credencial usada é DefaultAzureCredential porque combina credenciais comumente usadas na implantação e desenvolvimento e escolhe a credencial a ser usada com base em seu ambiente de execução. O consumerGroup é encontrado navegando até a instância do Hub de Eventos e selecionando "Grupos de consumidores" no painel "Entidades". A consumerGroup é obrigatória. A credencial usada é DefaultAzureCredential porque combina credenciais comumente usadas na implantação e desenvolvimento e escolhe a credencial a ser usada com base em seu ambiente de execução.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup("<< CONSUMER GROUP NAME >>")
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .checkpointStore(new SampleCheckpointStore())
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .buildEventProcessorClient();

Resumo do método

Modificador e tipo Método e descrição
String getIdentifier()

O identificador é um nome exclusivo fornecido a essa instância do processador de eventos.

synchronized boolean isRunning()

Retorna true se o processador de eventos estiver em execução.

synchronized void start()

Inicia o processamento de eventos para todas as partições do Hub de Eventos que esse processador de eventos pode possuir, atribuindo um dedicado PartitionProcessor a cada partição.

synchronized void stop()

Interrompe o processamento de eventos para todas as partições pertencentes a esse processador de eventos.

Métodos herdados de java.lang.Object

Detalhes do método

getIdentifier

public String getIdentifier()

O identificador é um nome exclusivo fornecido a essa instância do processador de eventos.

Returns:

Identificador para este processador de eventos.

isRunning

public synchronized boolean isRunning()

Retorna true se o processador de eventos estiver em execução. Se o processador de eventos já estiver em execução, chamar start() não terá efeito.

Returns:

true se o processador de eventos estiver em execução.

start

public synchronized void start()

Inicia o processamento de eventos para todas as partições do Hub de Eventos que esse processador de eventos pode possuir, atribuindo um dedicado PartitionProcessor a cada partição. Se houver outros Processadores de Eventos ativos para o mesmo grupo de consumidores no Hub de Eventos, a responsabilidade pelas partições será compartilhada entre eles.

As chamadas subsequentes para iniciar serão ignoradas se esse processador de eventos já estiver em execução. A chamada iniciar depois stop() de ser chamada reiniciará esse processador de eventos.

Iniciando o processador para consumir eventos de todas as partições

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .checkpointStore(new SampleCheckpointStore())
     .buildEventProcessorClient();

 eventProcessorClient.start();

 // Continue to perform other tasks while the processor is running in the background.
 //
 // Finally, stop the processor client when application is finished.
 eventProcessorClient.stop();

stop

public synchronized void stop()

Interrompe o processamento de eventos para todas as partições pertencentes a esse processador de eventos. Todos serão desligados e todos os PartitionProcessor recursos abertos serão fechados.

As chamadas subsequentes a serem interrompidas serão ignoradas se o processador de eventos não estiver em execução.

Parando o processador

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .checkpointStore(new SampleCheckpointStore())
     .buildEventProcessorClient();

 eventProcessorClient.start();

 // Continue to perform other tasks while the processor is running in the background.
 //
 // Finally, stop the processor client when application is finished.
 eventProcessorClient.stop();

Aplica-se a