EventHubConsumerAsyncClient Classe
- java.
lang. Object - com.
azure. messaging. eventhubs. EventHubConsumerAsyncClient
- com.
Implementações
public class EventHubConsumerAsyncClient
implements Closeable
Um consumidor assíncrono responsável pela leitura EventData de uma partição específica do Hub de Eventos ou de todas as partições no contexto de um grupo de consumidores específico.
Os exemplos mostrados neste documento usam um objeto de credencial chamado DefaultAzureCredential para autenticação, que é apropriado para a maioria dos cenários, incluindo ambientes locais de desenvolvimento e produção. Além disso, recomendamos usar a identidade gerenciada para autenticação em ambientes de produção. Você pode encontrar mais informações sobre diferentes maneiras de autenticação e seus tipos de credencial correspondentes na documentação da Identidade do Azure".
Exemplo: criando um EventHubConsumerAsyncClient
O exemplo de código a seguir demonstra a criação do cliente EventHubConsumerAsyncClientassíncrono . O fullyQualifiedNamespace
é o nome do host do Namespace dos Hubs de Eventos. Ele é listado no painel "Essentials" depois de navegar até o Namespace dos Hubs de Eventos por meio do Portal do Azure. O consumerGroup
é encontrado navegando até a instância do Hub de Eventos e selecionando "Grupos de consumidores" no painel "Entidades". O consumerGroup(String consumerGroup) é necessário para criar clientes consumidores. A credencial usada é DefaultAzureCredential
porque combina credenciais comumente usadas na implantação e desenvolvimento e escolhe a credencial a ser usada com base em seu ambiente de execução.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerAsyncClient producer = new EventHubClientBuilder()
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.buildAsyncProducerClient();
Exemplo: consumindo eventos de uma única partição do Hub de Eventos
O exemplo de código abaixo demonstra o recebimento de eventos da partição "0" de um Hub de Eventos a partir de latest(). latest() aponta para o final do fluxo de partição. O consumidor recebe eventos enfileirados depois que começou a assinar eventos.
receiveFromPartition(String partitionId, EventPosition startingPosition) é uma chamada sem bloqueio. Depois de configurar a operação, sua representação assíncrona é retornada. O Flux
deve ser inscrito, como o exemplo abaixo, para começar a receber eventos.
EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
new DefaultAzureCredentialBuilder().build())
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildAsyncConsumerClient();
// Obtain partitionId from EventHubConsumerAsyncClient.getPartitionIds()
String partitionId = "0";
EventPosition startingPosition = EventPosition.latest();
// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
//
// NOTE: This is a non-blocking call and will move to the next line of code after setting up the async
// operation. If the program ends after this, or the class is immediately disposed, no events will be
// received.
Disposable subscription = consumer.receiveFromPartition(partitionId, startingPosition)
.subscribe(partitionEvent -> {
PartitionContext partitionContext = partitionEvent.getPartitionContext();
EventData event = partitionEvent.getData();
System.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());
System.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString());
}, error -> {
// This is a terminal signal. No more events will be received from the same Flux object.
System.err.print("An error occurred:" + error);
}, () -> {
// This is a terminal signal. No more events will be received from the same Flux object.
System.out.print("Stream has ended.");
});
Exemplo: incluindo as informações de partição mais recentes em eventos recebidos
EventData pode ser decorado com as informações de partição mais recentes e enviado aos consumidores. Habilite isso definindo como setTrackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties)true
. À medida que os eventos entram, explore o PartitionEvent objeto . Isso é útil em cenários em que os clientes desejam constanter informações atualizadas sobre o Hub de Eventos. Isso leva um impacto no desempenho, pois as informações de partição extras devem ser enviadas pela transmissão a cada evento.
receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions) é uma chamada sem bloqueio. Depois de configurar a operação, sua representação assíncrona é retornada. O Flux
deve ser assinado, como o exemplo abaixo, para começar a receber eventos.
// Set `setTrackLastEnqueuedEventProperties` to true to get the last enqueued information from the partition for
// each event that is received.
ReceiveOptions receiveOptions = new ReceiveOptions()
.setTrackLastEnqueuedEventProperties(true);
EventPosition startingPosition = EventPosition.earliest();
// Receives events from partition "0" starting at the beginning of the stream.
// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
Disposable subscription = consumer.receiveFromPartition("0", startingPosition, receiveOptions)
.subscribe(partitionEvent -> {
LastEnqueuedEventProperties properties = partitionEvent.getLastEnqueuedEventProperties();
System.out.printf("Information received at %s. Last enqueued sequence number: %s%n",
properties.getRetrievalTime(),
properties.getSequenceNumber());
});
Exemplo: taxa que limita o consumo de eventos do Hub de Eventos
Para consumidores de eventos que precisam limitar o número de eventos que recebem em um determinado momento, eles podem usar BaseSubscriber#request(long). O uso de um assinante personalizado permite aos desenvolvedores um controle mais granular sobre a taxa em que eles recebem eventos.
receiveFromPartition(String partitionId, EventPosition startingPosition) é uma chamada sem bloqueio. Depois de configurar a operação, sua representação assíncrona é retornada. O Flux
deve ser inscrito, como o exemplo abaixo, para começar a receber eventos.
consumer.receiveFromPartition(partitionId, EventPosition.latest()).subscribe(new BaseSubscriber<PartitionEvent>() {
private static final int NUMBER_OF_EVENTS = 5;
private final AtomicInteger currentNumberOfEvents = new AtomicInteger();
@Override
protected void hookOnSubscribe(Subscription subscription) {
// Tell the Publisher we only want 5 events at a time.
request(NUMBER_OF_EVENTS);
}
@Override
protected void hookOnNext(PartitionEvent value) {
// Process the EventData
// If the number of events we have currently received is a multiple of 5, that means we have reached the
// last event the Publisher will provide to us. Invoking request(long) here, tells the Publisher that
// the subscriber is ready to get more events from upstream.
if (currentNumberOfEvents.incrementAndGet() % 5 == 0) {
request(NUMBER_OF_EVENTS);
}
}
});
Exemplo: recebimento de todas as partições
O exemplo de código abaixo demonstra o recebimento de eventos de todas as partições de um Hub de Eventos iniciando o início do fluxo de cada partição. Isso é valioso para fins de demonstração, mas não se destina a cenários de produção. Para cenários de produção, considere o uso de EventProcessorClient.
receive(boolean startReadingAtEarliestEvent) é uma chamada sem bloqueio. Depois de configurar a operação, sua representação assíncrona é retornada. O Flux
deve ser inscrito, como o exemplo abaixo, para começar a receber eventos.
// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
Disposable subscription = consumer.receive(true)
.subscribe(partitionEvent -> {
PartitionContext context = partitionEvent.getPartitionContext();
EventData event = partitionEvent.getData();
System.out.printf("Event %s is from partition %s%n.", event.getSequenceNumber(),
context.getPartitionId());
}, error -> {
// This is a terminal signal. No more events will be received from the same Flux object.
System.err.print("An error occurred:" + error);
}, () -> {
// This is a terminal signal. No more events will be received from the same Flux object.
System.out.print("Stream has ended.");
});
Resumo do método
Modificador e tipo | Método e descrição |
---|---|
void |
close()
Descarta o consumidor fechando a conexão subjacente com o serviço. |
String |
getConsumerGroup()
Obtém o grupo de consumidores do qual esse consumidor está lendo eventos como parte. |
String |
getEventHubName()
Obtém o nome do Hub de Eventos com o qual esse cliente interage. |
Mono<Event |
getEventHubProperties()
Recupera informações sobre um Hub de Eventos, incluindo o número de partições presentes e seus identificadores. |
String |
getFullyQualifiedNamespace()
Obtém o namespace dos Hubs de Eventos totalmente qualificado ao qual a conexão está associada. |
String |
getIdentifier()
Obtém o identificador do cliente. |
Flux<String> |
getPartitionIds()
Recupera os identificadores para as partições de um Hub de Eventos. |
Mono<Partition |
getPartitionProperties(String partitionId)
Recupera informações sobre uma partição específica para um Hub de Eventos, incluindo elementos que descrevem os eventos disponíveis no fluxo de eventos de partição. |
Flux<Partition |
receive()
Consome eventos de todas as partições a partir do início de cada partição. |
Flux<Partition |
receive(boolean startReadingAtEarliestEvent)
Consome eventos de todas as partições. |
Flux<Partition |
receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions)
Consome eventos de todas as partições configuradas com um conjunto de |
Flux<Partition |
receiveFromPartition(String partitionId, EventPosition startingPosition)
Consome eventos de uma única partição começando em |
Flux<Partition |
receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions)
Consome eventos de uma única partição começando em |
Métodos herdados de java.lang.Object
Detalhes do método
close
public void close()
Descarta o consumidor fechando a conexão subjacente com o serviço.
getConsumerGroup
public String getConsumerGroup()
Obtém o grupo de consumidores do qual esse consumidor está lendo eventos como parte.
Returns:
getEventHubName
public String getEventHubName()
Obtém o nome do Hub de Eventos com o qual esse cliente interage.
Returns:
getEventHubProperties
public Mono
Recupera informações sobre um Hub de Eventos, incluindo o número de partições presentes e seus identificadores.
Returns:
getFullyQualifiedNamespace
public String getFullyQualifiedNamespace()
Obtém o namespace dos Hubs de Eventos totalmente qualificado ao qual a conexão está associada. Isso provavelmente é semelhante a {yournamespace}.servicebus.windows.net
.
Returns:
getIdentifier
public String getIdentifier()
Obtém o identificador do cliente.
Returns:
getPartitionIds
public Flux
Recupera os identificadores para as partições de um Hub de Eventos.
Returns:
getPartitionProperties
public Mono
Recupera informações sobre uma partição específica para um Hub de Eventos, incluindo elementos que descrevem os eventos disponíveis no fluxo de eventos de partição.
Parameters:
Returns:
receive
public Flux
Consome eventos de todas as partições a partir do início de cada partição.
Esse método não é recomendado para uso em produção; o EventProcessorClient deve ser usado para ler eventos de todas as partições em um cenário de produção, pois oferece uma experiência muito mais robusta com maior taxa de transferência. É importante observar que esse método não garante a imparcialidade entre as partições. Dependendo da comunicação do serviço, pode haver uma clustering de eventos por partição e/ou pode haver um viés perceptível para uma determinada partição ou subconjunto de partições.
Returns:
receive
public Flux
Consome eventos de todas as partições.
Esse método não é recomendado para uso em produção; o EventProcessorClient deve ser usado para ler eventos de todas as partições em um cenário de produção, pois oferece uma experiência muito mais robusta com maior taxa de transferência. É importante observar que esse método não garante a imparcialidade entre as partições. Dependendo da comunicação do serviço, pode haver uma clustering de eventos por partição e/ou pode haver um viés perceptível para uma determinada partição ou subconjunto de partições.
Parameters:
true
para começar a ler os primeiros eventos disponíveis em cada partição; caso contrário, a leitura começará no final de cada partição, vendo apenas novos eventos conforme eles são publicados.
Returns:
receive
public Flux
Consome eventos de todas as partições configuradas com um conjunto de receiveOptions
.
Esse método não é recomendado para uso em produção; o EventProcessorClient deve ser usado para ler eventos de todas as partições em um cenário de produção, pois oferece uma experiência muito mais robusta com maior taxa de transferência. É importante observar que esse método não garante a imparcialidade entre as partições. Dependendo da comunicação do serviço, pode haver uma clustering de eventos por partição e/ou pode haver um viés perceptível para uma determinada partição ou subconjunto de partições.
- Se receive for invocado quando getOwnerLevel() tiver um valor, o serviço de Hubs de Eventos garantirá que apenas um consumidor ativo exista por partitionId e combinação de grupo de consumidores. Às vezes, essa operação de recebimento é chamada de "Consumidor de Época".
- Vários consumidores por partitionId e combinação de grupo de consumidores podem ser criados por não configuração getOwnerLevel() ao invocar operações de recebimento. Esse consumidor não exclusivo às vezes é chamado de "Consumidor não-Época".
Parameters:
true
para começar a ler os primeiros eventos disponíveis em cada partição; caso contrário, a leitura começará no final de cada partição, vendo apenas novos eventos conforme eles são publicados.
Returns:
receiveFromPartition
public Flux
Consome eventos de uma única partição começando em startingPosition
.
Parameters:
Returns:
startingPosition
.receiveFromPartition
public Flux
Consome eventos de uma única partição começando em startingPosition
com um conjunto de ReceiveOptions.
- Se receive for invocado quando getOwnerLevel() tiver um valor, o serviço de Hubs de Eventos garantirá que apenas um consumidor ativo exista por partitionId e combinação de grupo de consumidores. Às vezes, essa operação de recebimento é chamada de "Consumidor de Época".
- Vários consumidores por partitionId e combinação de grupo de consumidores podem ser criados por não configuração getOwnerLevel() ao invocar operações de recebimento. Esse consumidor não exclusivo às vezes é chamado de "Consumidor não-Época".
Parameters:
Returns:
startingPosition
.Aplica-se a
Azure SDK for Java