Compartilhar via


EventHubConsumerAsyncClient Classe

  • java.lang.Object
    • com.azure.messaging.eventhubs.EventHubConsumerAsyncClient

Implementações

public class EventHubConsumerAsyncClient
implements Closeable

Um consumidor assíncrono responsável pela leitura EventData de uma partição específica do Hub de Eventos ou de todas as partições no contexto de um grupo de consumidores específico.

Os exemplos mostrados neste documento usam um objeto de credencial chamado DefaultAzureCredential para autenticação, que é apropriado para a maioria dos cenários, incluindo ambientes locais de desenvolvimento e produção. Além disso, recomendamos usar a identidade gerenciada para autenticação em ambientes de produção. Você pode encontrar mais informações sobre diferentes maneiras de autenticação e seus tipos de credencial correspondentes na documentação da Identidade do Azure".

Exemplo: criando um EventHubConsumerAsyncClient

O exemplo de código a seguir demonstra a criação do cliente EventHubConsumerAsyncClientassíncrono . O fullyQualifiedNamespace é o nome do host do Namespace dos Hubs de Eventos. Ele é listado no painel "Essentials" depois de navegar até o Namespace dos Hubs de Eventos por meio do Portal do Azure. O consumerGroup é encontrado navegando até a instância do Hub de Eventos e selecionando "Grupos de consumidores" no painel "Entidades". O consumerGroup(String consumerGroup) é necessário para criar clientes consumidores. A credencial usada é DefaultAzureCredential porque combina credenciais comumente usadas na implantação e desenvolvimento e escolhe a credencial a ser usada com base em seu ambiente de execução.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventHubProducerAsyncClient producer = new EventHubClientBuilder()
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .buildAsyncProducerClient();

Exemplo: consumindo eventos de uma única partição do Hub de Eventos

O exemplo de código abaixo demonstra o recebimento de eventos da partição "0" de um Hub de Eventos a partir de latest(). latest() aponta para o final do fluxo de partição. O consumidor recebe eventos enfileirados depois que começou a assinar eventos.

receiveFromPartition(String partitionId, EventPosition startingPosition) é uma chamada sem bloqueio. Depois de configurar a operação, sua representação assíncrona é retornada. O Flux deve ser inscrito, como o exemplo abaixo, para começar a receber eventos.

EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         new DefaultAzureCredentialBuilder().build())
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .buildAsyncConsumerClient();

 // Obtain partitionId from EventHubConsumerAsyncClient.getPartitionIds()
 String partitionId = "0";
 EventPosition startingPosition = EventPosition.latest();

 // Keep a reference to `subscription`. When the program is finished receiving events, call
 // subscription.dispose(). This will stop fetching events from the Event Hub.
 //
 // NOTE: This is a non-blocking call and will move to the next line of code after setting up the async
 // operation.  If the program ends after this, or the class is immediately disposed, no events will be
 // received.
 Disposable subscription = consumer.receiveFromPartition(partitionId, startingPosition)
     .subscribe(partitionEvent -> {
         PartitionContext partitionContext = partitionEvent.getPartitionContext();
         EventData event = partitionEvent.getData();

         System.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());
         System.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString());
     }, error -> {
         // This is a terminal signal.  No more events will be received from the same Flux object.
         System.err.print("An error occurred:" + error);
     }, () -> {
         // This is a terminal signal.  No more events will be received from the same Flux object.
         System.out.print("Stream has ended.");
     });

Exemplo: incluindo as informações de partição mais recentes em eventos recebidos

EventData pode ser decorado com as informações de partição mais recentes e enviado aos consumidores. Habilite isso definindo como setTrackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties)true. À medida que os eventos entram, explore o PartitionEvent objeto . Isso é útil em cenários em que os clientes desejam constanter informações atualizadas sobre o Hub de Eventos. Isso leva um impacto no desempenho, pois as informações de partição extras devem ser enviadas pela transmissão a cada evento.

receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions) é uma chamada sem bloqueio. Depois de configurar a operação, sua representação assíncrona é retornada. O Flux deve ser assinado, como o exemplo abaixo, para começar a receber eventos.

// Set `setTrackLastEnqueuedEventProperties` to true to get the last enqueued information from the partition for
 // each event that is received.
 ReceiveOptions receiveOptions = new ReceiveOptions()
     .setTrackLastEnqueuedEventProperties(true);
 EventPosition startingPosition = EventPosition.earliest();

 // Receives events from partition "0" starting at the beginning of the stream.
 // Keep a reference to `subscription`. When the program is finished receiving events, call
 // subscription.dispose(). This will stop fetching events from the Event Hub.
 Disposable subscription = consumer.receiveFromPartition("0", startingPosition, receiveOptions)
     .subscribe(partitionEvent -> {
         LastEnqueuedEventProperties properties = partitionEvent.getLastEnqueuedEventProperties();
         System.out.printf("Information received at %s. Last enqueued sequence number: %s%n",
             properties.getRetrievalTime(),
             properties.getSequenceNumber());
     });

Exemplo: taxa que limita o consumo de eventos do Hub de Eventos

Para consumidores de eventos que precisam limitar o número de eventos que recebem em um determinado momento, eles podem usar BaseSubscriber#request(long). O uso de um assinante personalizado permite aos desenvolvedores um controle mais granular sobre a taxa em que eles recebem eventos.

receiveFromPartition(String partitionId, EventPosition startingPosition) é uma chamada sem bloqueio. Depois de configurar a operação, sua representação assíncrona é retornada. O Flux deve ser inscrito, como o exemplo abaixo, para começar a receber eventos.

consumer.receiveFromPartition(partitionId, EventPosition.latest()).subscribe(new BaseSubscriber<PartitionEvent>() {
     private static final int NUMBER_OF_EVENTS = 5;
     private final AtomicInteger currentNumberOfEvents = new AtomicInteger();

     @Override
     protected void hookOnSubscribe(Subscription subscription) {
         // Tell the Publisher we only want 5 events at a time.
         request(NUMBER_OF_EVENTS);
     }

     @Override
     protected void hookOnNext(PartitionEvent value) {
         // Process the EventData

         // If the number of events we have currently received is a multiple of 5, that means we have reached the
         // last event the Publisher will provide to us. Invoking request(long) here, tells the Publisher that
         // the subscriber is ready to get more events from upstream.
         if (currentNumberOfEvents.incrementAndGet() % 5 == 0) {
             request(NUMBER_OF_EVENTS);
         }
     }
 });

Exemplo: recebimento de todas as partições

O exemplo de código abaixo demonstra o recebimento de eventos de todas as partições de um Hub de Eventos iniciando o início do fluxo de cada partição. Isso é valioso para fins de demonstração, mas não se destina a cenários de produção. Para cenários de produção, considere o uso de EventProcessorClient.

receive(boolean startReadingAtEarliestEvent) é uma chamada sem bloqueio. Depois de configurar a operação, sua representação assíncrona é retornada. O Flux deve ser inscrito, como o exemplo abaixo, para começar a receber eventos.

// Keep a reference to `subscription`. When the program is finished receiving events, call
 // subscription.dispose(). This will stop fetching events from the Event Hub.
 Disposable subscription = consumer.receive(true)
     .subscribe(partitionEvent -> {
         PartitionContext context = partitionEvent.getPartitionContext();
         EventData event = partitionEvent.getData();

         System.out.printf("Event %s is from partition %s%n.", event.getSequenceNumber(),
             context.getPartitionId());
     }, error -> {
         // This is a terminal signal.  No more events will be received from the same Flux object.
         System.err.print("An error occurred:" + error);
     }, () -> {
         // This is a terminal signal.  No more events will be received from the same Flux object.
         System.out.print("Stream has ended.");
     });

Resumo do método

Modificador e tipo Método e descrição
void close()

Descarta o consumidor fechando a conexão subjacente com o serviço.

String getConsumerGroup()

Obtém o grupo de consumidores do qual esse consumidor está lendo eventos como parte.

String getEventHubName()

Obtém o nome do Hub de Eventos com o qual esse cliente interage.

Mono<EventHubProperties> getEventHubProperties()

Recupera informações sobre um Hub de Eventos, incluindo o número de partições presentes e seus identificadores.

String getFullyQualifiedNamespace()

Obtém o namespace dos Hubs de Eventos totalmente qualificado ao qual a conexão está associada.

String getIdentifier()

Obtém o identificador do cliente.

Flux<String> getPartitionIds()

Recupera os identificadores para as partições de um Hub de Eventos.

Mono<PartitionProperties> getPartitionProperties(String partitionId)

Recupera informações sobre uma partição específica para um Hub de Eventos, incluindo elementos que descrevem os eventos disponíveis no fluxo de eventos de partição.

Flux<PartitionEvent> receive()

Consome eventos de todas as partições a partir do início de cada partição.

Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent)

Consome eventos de todas as partições.

Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions)

Consome eventos de todas as partições configuradas com um conjunto de receiveOptions.

Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPosition startingPosition)

Consome eventos de uma única partição começando em startingPosition.

Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions)

Consome eventos de uma única partição começando em startingPosition com um conjunto de ReceiveOptions.

Métodos herdados de java.lang.Object

Detalhes do método

close

public void close()

Descarta o consumidor fechando a conexão subjacente com o serviço.

getConsumerGroup

public String getConsumerGroup()

Obtém o grupo de consumidores do qual esse consumidor está lendo eventos como parte.

Returns:

O grupo de consumidores do qual esse consumidor está lendo eventos como parte.

getEventHubName

public String getEventHubName()

Obtém o nome do Hub de Eventos com o qual esse cliente interage.

Returns:

O nome do Hub de Eventos com o qual esse cliente interage.

getEventHubProperties

public Mono getEventHubProperties()

Recupera informações sobre um Hub de Eventos, incluindo o número de partições presentes e seus identificadores.

Returns:

O conjunto de informações para o Hub de Eventos ao qual esse cliente está associado.

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

Obtém o namespace dos Hubs de Eventos totalmente qualificado ao qual a conexão está associada. Isso provavelmente é semelhante a {yournamespace}.servicebus.windows.net.

Returns:

O namespace dos Hubs de Eventos totalmente qualificado ao qual a conexão está associada

getIdentifier

public String getIdentifier()

Obtém o identificador do cliente.

Returns:

A cadeia de caracteres de identificador exclusiva para o cliente atual.

getPartitionIds

public Flux getPartitionIds()

Recupera os identificadores para as partições de um Hub de Eventos.

Returns:

Um Fluxo de identificadores para as partições de um Hub de Eventos.

getPartitionProperties

public Mono getPartitionProperties(String partitionId)

Recupera informações sobre uma partição específica para um Hub de Eventos, incluindo elementos que descrevem os eventos disponíveis no fluxo de eventos de partição.

Parameters:

partitionId - O identificador exclusivo de uma partição associada ao Hub de Eventos.

Returns:

O conjunto de informações para a partição solicitada no Hub de Eventos ao qual esse cliente está associado.

receive

public Flux receive()

Consome eventos de todas as partições a partir do início de cada partição.

Esse método não é recomendado para uso em produção; o EventProcessorClient deve ser usado para ler eventos de todas as partições em um cenário de produção, pois oferece uma experiência muito mais robusta com maior taxa de transferência. É importante observar que esse método não garante a imparcialidade entre as partições. Dependendo da comunicação do serviço, pode haver uma clustering de eventos por partição e/ou pode haver um viés perceptível para uma determinada partição ou subconjunto de partições.

Returns:

Um fluxo de eventos para cada partição no Hub de Eventos a partir do início de cada partição.

receive

public Flux receive(boolean startReadingAtEarliestEvent)

Consome eventos de todas as partições.

Esse método não é recomendado para uso em produção; o EventProcessorClient deve ser usado para ler eventos de todas as partições em um cenário de produção, pois oferece uma experiência muito mais robusta com maior taxa de transferência. É importante observar que esse método não garante a imparcialidade entre as partições. Dependendo da comunicação do serviço, pode haver uma clustering de eventos por partição e/ou pode haver um viés perceptível para uma determinada partição ou subconjunto de partições.

Parameters:

startReadingAtEarliestEvent - true para começar a ler os primeiros eventos disponíveis em cada partição; caso contrário, a leitura começará no final de cada partição, vendo apenas novos eventos conforme eles são publicados.

Returns:

Um fluxo de eventos para cada partição no Hub de Eventos.

receive

public Flux receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions)

Consome eventos de todas as partições configuradas com um conjunto de receiveOptions.

Esse método não é recomendado para uso em produção; o EventProcessorClient deve ser usado para ler eventos de todas as partições em um cenário de produção, pois oferece uma experiência muito mais robusta com maior taxa de transferência. É importante observar que esse método não garante a imparcialidade entre as partições. Dependendo da comunicação do serviço, pode haver uma clustering de eventos por partição e/ou pode haver um viés perceptível para uma determinada partição ou subconjunto de partições.

  • Se receive for invocado quando getOwnerLevel() tiver um valor, o serviço de Hubs de Eventos garantirá que apenas um consumidor ativo exista por partitionId e combinação de grupo de consumidores. Às vezes, essa operação de recebimento é chamada de "Consumidor de Época".
  • Vários consumidores por partitionId e combinação de grupo de consumidores podem ser criados por não configuração getOwnerLevel() ao invocar operações de recebimento. Esse consumidor não exclusivo às vezes é chamado de "Consumidor não-Época".

Parameters:

startReadingAtEarliestEvent - true para começar a ler os primeiros eventos disponíveis em cada partição; caso contrário, a leitura começará no final de cada partição, vendo apenas novos eventos conforme eles são publicados.
receiveOptions - Opções ao receber eventos de cada partição do Hub de Eventos.

Returns:

Um fluxo de eventos para cada partição no Hub de Eventos.

receiveFromPartition

public Flux receiveFromPartition(String partitionId, EventPosition startingPosition)

Consome eventos de uma única partição começando em startingPosition.

Parameters:

partitionId - Identificador da partição da qual ler eventos.
startingPosition - Posicione dentro da partição do Hub de Eventos para começar a consumir eventos.

Returns:

Um fluxo de eventos para essa partição a partir de startingPosition.

receiveFromPartition

public Flux receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions)

Consome eventos de uma única partição começando em startingPosition com um conjunto de ReceiveOptions.

  • Se receive for invocado quando getOwnerLevel() tiver um valor, o serviço de Hubs de Eventos garantirá que apenas um consumidor ativo exista por partitionId e combinação de grupo de consumidores. Às vezes, essa operação de recebimento é chamada de "Consumidor de Época".
  • Vários consumidores por partitionId e combinação de grupo de consumidores podem ser criados por não configuração getOwnerLevel() ao invocar operações de recebimento. Esse consumidor não exclusivo às vezes é chamado de "Consumidor não-Época".

Parameters:

partitionId - Identificador da partição da qual ler eventos.
startingPosition - Posicione dentro da partição do Hub de Eventos para começar a consumir eventos.
receiveOptions - Opções ao receber eventos da partição.

Returns:

Um fluxo de eventos para essa partição. Se um fluxo para os eventos tiver sido aberto antes, a mesma posição dentro dessa partição será retornada. Caso contrário, os eventos serão lidos a partir de startingPosition.

Aplica-se a