Compartir a través de


EventHubConsumerAsyncClient Clase

  • java.lang.Object
    • com.azure.messaging.eventhubs.EventHubConsumerAsyncClient

Implementaciones

public class EventHubConsumerAsyncClient
implements Closeable

Un consumidor asincrónico responsable de leer EventData desde una partición específica del centro de eventos o todas las particiones en el contexto de un grupo de consumidores específico.

Los ejemplos que se muestran en este documento usan un objeto de credencial denominado DefaultAzureCredential para la autenticación, que es adecuado para la mayoría de los escenarios, incluidos los entornos de desarrollo y producción locales. Además, se recomienda usar la identidad administrada para la autenticación en entornos de producción. Puede encontrar más información sobre las distintas formas de autenticación y sus tipos de credenciales correspondientes en la documentación de Azure Identity.

Ejemplo: Creación de un EventHubConsumerAsyncClient

En el ejemplo de código siguiente se muestra la creación del cliente EventHubConsumerAsyncClientasincrónico . fullyQualifiedNamespace es el nombre de host del espacio de nombres de Event Hubs. Aparece en el panel "Essentials" después de navegar al espacio de nombres de Event Hubs a través de Azure Portal. Para consumerGroup encontrar , vaya a la instancia del centro de eventos y seleccione "Grupos de consumidores" en el panel "Entidades". consumerGroup(String consumerGroup) es necesario para crear clientes de consumidor. La credencial usada se debe DefaultAzureCredential a que combina credenciales usadas habitualmente en la implementación y el desarrollo y elige la credencial que se usará en función de su entorno en ejecución.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventHubProducerAsyncClient producer = new EventHubClientBuilder()
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .buildAsyncProducerClient();

Ejemplo: Consumo de eventos de una sola partición desde el centro de eventos

En el ejemplo de código siguiente se muestra cómo recibir eventos de la partición "0" de un centro de eventos a partir de latest(). latest() apunta al final del flujo de partición. El consumidor recibe eventos puestos en cola después de empezar a suscribirse a eventos.

receiveFromPartition(String partitionId, EventPosition startingPosition) es una llamada sin bloqueo. Después de configurar la operación, se devuelve su representación asincrónica. Flux Debe suscribirse a , como en el ejemplo siguiente, para empezar a recibir eventos.

EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         new DefaultAzureCredentialBuilder().build())
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .buildAsyncConsumerClient();

 // Obtain partitionId from EventHubConsumerAsyncClient.getPartitionIds()
 String partitionId = "0";
 EventPosition startingPosition = EventPosition.latest();

 // Keep a reference to `subscription`. When the program is finished receiving events, call
 // subscription.dispose(). This will stop fetching events from the Event Hub.
 //
 // NOTE: This is a non-blocking call and will move to the next line of code after setting up the async
 // operation.  If the program ends after this, or the class is immediately disposed, no events will be
 // received.
 Disposable subscription = consumer.receiveFromPartition(partitionId, startingPosition)
     .subscribe(partitionEvent -> {
         PartitionContext partitionContext = partitionEvent.getPartitionContext();
         EventData event = partitionEvent.getData();

         System.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());
         System.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString());
     }, error -> {
         // This is a terminal signal.  No more events will be received from the same Flux object.
         System.err.print("An error occurred:" + error);
     }, () -> {
         // This is a terminal signal.  No more events will be received from the same Flux object.
         System.out.print("Stream has ended.");
     });

Ejemplo: Incluir la información de partición más reciente en eventos recibidos

EventData se puede decorar con la información de partición más reciente y enviarse a los consumidores. Habilite esto estableciendo setTrackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties) en true. A medida que entran los eventos, explore el PartitionEvent objeto . Esto es útil en escenarios en los que los clientes quieren mantener información actualizada constante sobre su centro de eventos. Esto tiene un impacto de rendimiento, ya que la información de partición adicional debe enviarse a través de la conexión con cada evento.

receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions) es una llamada sin bloqueo. Después de configurar la operación, se devuelve su representación asincrónica. Flux Debe suscribirse a, como se muestra a continuación, para empezar a recibir eventos.

// Set `setTrackLastEnqueuedEventProperties` to true to get the last enqueued information from the partition for
 // each event that is received.
 ReceiveOptions receiveOptions = new ReceiveOptions()
     .setTrackLastEnqueuedEventProperties(true);
 EventPosition startingPosition = EventPosition.earliest();

 // Receives events from partition "0" starting at the beginning of the stream.
 // Keep a reference to `subscription`. When the program is finished receiving events, call
 // subscription.dispose(). This will stop fetching events from the Event Hub.
 Disposable subscription = consumer.receiveFromPartition("0", startingPosition, receiveOptions)
     .subscribe(partitionEvent -> {
         LastEnqueuedEventProperties properties = partitionEvent.getLastEnqueuedEventProperties();
         System.out.printf("Information received at %s. Last enqueued sequence number: %s%n",
             properties.getRetrievalTime(),
             properties.getSequenceNumber());
     });

Ejemplo: Limitación de velocidad del consumo de eventos del centro de eventos

Para los consumidores de eventos que necesitan limitar el número de eventos que reciben en un momento dado, pueden usar BaseSubscriber#request(long). El uso de un suscriptor personalizado permite a los desarrolladores controlar más pormenorizado la velocidad a la que reciben eventos.

receiveFromPartition(String partitionId, EventPosition startingPosition) es una llamada sin bloqueo. Después de configurar la operación, se devuelve su representación asincrónica. Flux Debe suscribirse a , como en el ejemplo siguiente, para empezar a recibir eventos.

consumer.receiveFromPartition(partitionId, EventPosition.latest()).subscribe(new BaseSubscriber<PartitionEvent>() {
     private static final int NUMBER_OF_EVENTS = 5;
     private final AtomicInteger currentNumberOfEvents = new AtomicInteger();

     @Override
     protected void hookOnSubscribe(Subscription subscription) {
         // Tell the Publisher we only want 5 events at a time.
         request(NUMBER_OF_EVENTS);
     }

     @Override
     protected void hookOnNext(PartitionEvent value) {
         // Process the EventData

         // If the number of events we have currently received is a multiple of 5, that means we have reached the
         // last event the Publisher will provide to us. Invoking request(long) here, tells the Publisher that
         // the subscriber is ready to get more events from upstream.
         if (currentNumberOfEvents.incrementAndGet() % 5 == 0) {
             request(NUMBER_OF_EVENTS);
         }
     }
 });

Ejemplo: Recepción de todas las particiones

En el ejemplo de código siguiente se muestra cómo recibir eventos de todas las particiones de un centro de eventos a partir del principio de la secuencia de cada partición. Esto es útil para fines de demostración, pero no está pensado para escenarios de producción. Para escenarios de producción, considere la posibilidad de usar EventProcessorClient.

receive(boolean startReadingAtEarliestEvent) es una llamada sin bloqueo. Después de configurar la operación, se devuelve su representación asincrónica. Flux Debe suscribirse a , como en el ejemplo siguiente, para empezar a recibir eventos.

// Keep a reference to `subscription`. When the program is finished receiving events, call
 // subscription.dispose(). This will stop fetching events from the Event Hub.
 Disposable subscription = consumer.receive(true)
     .subscribe(partitionEvent -> {
         PartitionContext context = partitionEvent.getPartitionContext();
         EventData event = partitionEvent.getData();

         System.out.printf("Event %s is from partition %s%n.", event.getSequenceNumber(),
             context.getPartitionId());
     }, error -> {
         // This is a terminal signal.  No more events will be received from the same Flux object.
         System.err.print("An error occurred:" + error);
     }, () -> {
         // This is a terminal signal.  No more events will be received from the same Flux object.
         System.out.print("Stream has ended.");
     });

Resumen del método

Modificador y tipo Método y descripción
void close()

Elimina al consumidor cerrando la conexión subyacente al servicio.

String getConsumerGroup()

Obtiene el grupo de consumidores del que este consumidor lee los eventos como parte de .

String getEventHubName()

Obtiene el nombre del centro de eventos con el que interactúa este cliente.

Mono<EventHubProperties> getEventHubProperties()

Recupera información sobre un centro de eventos, incluido el número de particiones presentes y sus identificadores.

String getFullyQualifiedNamespace()

Obtiene el espacio de nombres completo de Event Hubs al que está asociada la conexión.

String getIdentifier()

Obtiene el identificador de cliente.

Flux<String> getPartitionIds()

Recupera los identificadores de las particiones de un centro de eventos.

Mono<PartitionProperties> getPartitionProperties(String partitionId)

Recupera información sobre una partición específica para un centro de eventos, incluidos los elementos que describen los eventos disponibles en el flujo de eventos de partición.

Flux<PartitionEvent> receive()

Consume eventos de todas las particiones a partir del principio de cada partición.

Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent)

Consume eventos de todas las particiones.

Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions)

Consume eventos de todas las particiones configuradas con un conjunto de receiveOptions.

Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPosition startingPosition)

Consume eventos de una sola partición a partir de startingPosition.

Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions)

Consume eventos de una sola partición a partir startingPosition de con un conjunto de ReceiveOptions.

Métodos heredados de java.lang.Object

Detalles del método

close

public void close()

Elimina al consumidor cerrando la conexión subyacente al servicio.

getConsumerGroup

public String getConsumerGroup()

Obtiene el grupo de consumidores del que este consumidor lee los eventos como parte de .

Returns:

El grupo de consumidores del que este consumidor lee eventos como parte.

getEventHubName

public String getEventHubName()

Obtiene el nombre del centro de eventos con el que interactúa este cliente.

Returns:

El nombre del centro de eventos con el que interactúa este cliente.

getEventHubProperties

public Mono getEventHubProperties()

Recupera información sobre un centro de eventos, incluido el número de particiones presentes y sus identificadores.

Returns:

Conjunto de información para el centro de eventos al que está asociado este cliente.

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

Obtiene el espacio de nombres completo de Event Hubs al que está asociada la conexión. Es probable que sea similar a {yournamespace}.servicebus.windows.net.

Returns:

Espacio de nombres completo de Event Hubs al que está asociada la conexión

getIdentifier

public String getIdentifier()

Obtiene el identificador de cliente.

Returns:

Cadena de identificador único para el cliente actual.

getPartitionIds

public Flux getPartitionIds()

Recupera los identificadores de las particiones de un centro de eventos.

Returns:

Flujo de identificadores para las particiones de un centro de eventos.

getPartitionProperties

public Mono getPartitionProperties(String partitionId)

Recupera información sobre una partición específica para un centro de eventos, incluidos los elementos que describen los eventos disponibles en el flujo de eventos de partición.

Parameters:

partitionId - Identificador único de una partición asociada al centro de eventos.

Returns:

Conjunto de información para la partición solicitada en el centro de eventos al que está asociado este cliente.

receive

public Flux receive()

Consume eventos de todas las particiones a partir del principio de cada partición.

Este método no se recomienda para su uso en producción; EventProcessorClient se debe usar para leer eventos de todas las particiones de un escenario de producción, ya que ofrece una experiencia mucho más sólida con un mayor rendimiento. Es importante tener en cuenta que este método no garantiza la equidad entre las particiones. En función de la comunicación del servicio, puede haber una agrupación en clústeres de eventos por partición o puede haber un sesgo notable para una partición determinada o un subconjunto de particiones.

Returns:

Secuencia de eventos para cada partición del centro de eventos a partir del principio de cada partición.

receive

public Flux receive(boolean startReadingAtEarliestEvent)

Consume eventos de todas las particiones.

Este método no se recomienda para su uso en producción; EventProcessorClient se debe usar para leer eventos de todas las particiones en un escenario de producción, ya que ofrece una experiencia mucho más sólida con un mayor rendimiento. Es importante tener en cuenta que este método no garantiza la equidad entre las particiones. En función de la comunicación del servicio, puede haber una agrupación en clústeres de eventos por partición o puede haber un sesgo notable para una partición determinada o un subconjunto de particiones.

Parameters:

startReadingAtEarliestEvent - true para empezar a leer en los primeros eventos disponibles en cada partición; De lo contrario, la lectura comenzará al final de cada partición viendo solo los nuevos eventos a medida que se publican.

Returns:

Secuencia de eventos para cada partición del centro de eventos.

receive

public Flux receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions)

Consume eventos de todas las particiones configuradas con un conjunto de receiveOptions.

Este método no se recomienda para su uso en producción; EventProcessorClient se debe usar para leer eventos de todas las particiones en un escenario de producción, ya que ofrece una experiencia mucho más sólida con un mayor rendimiento. Es importante tener en cuenta que este método no garantiza la equidad entre las particiones. En función de la comunicación del servicio, puede haber una agrupación en clústeres de eventos por partición o puede haber un sesgo notable para una partición determinada o un subconjunto de particiones.

  • Si receive se invoca donde getOwnerLevel() tiene un valor, el servicio Event Hubs garantizará que solo exista un consumidor activo por partitionId y por combinación de grupo de consumidores. Esta operación de recepción se conoce a veces como "Consumidor de época".
  • Se pueden crear varios consumidores por partitionId y combinación de grupo de consumidores al invocar getOwnerLevel() operaciones de recepción. A veces, este consumidor no exclusivo se conoce como "Consumidor no de época".

Parameters:

startReadingAtEarliestEvent - true para empezar a leer en los primeros eventos disponibles en cada partición; De lo contrario, la lectura comenzará al final de cada partición viendo solo los nuevos eventos a medida que se publican.
receiveOptions - Opciones al recibir eventos de cada partición del centro de eventos.

Returns:

Secuencia de eventos para cada partición del centro de eventos.

receiveFromPartition

public Flux receiveFromPartition(String partitionId, EventPosition startingPosition)

Consume eventos de una sola partición a partir de startingPosition.

Parameters:

partitionId - Identificador de la partición desde la que se van a leer los eventos.
startingPosition - Posición dentro de la partición del centro de eventos para empezar a consumir eventos.

Returns:

Secuencia de eventos para esta partición a partir de startingPosition.

receiveFromPartition

public Flux receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions)

Consume eventos de una sola partición a partir startingPosition de con un conjunto de ReceiveOptions.

  • Si receive se invoca donde getOwnerLevel() tiene un valor, el servicio Event Hubs garantizará que solo exista un consumidor activo por partitionId y por combinación de grupo de consumidores. Esta operación de recepción se conoce a veces como "Consumidor de época".
  • Se pueden crear varios consumidores por partitionId y combinación de grupo de consumidores al invocar getOwnerLevel() operaciones de recepción. A veces, este consumidor no exclusivo se conoce como "Consumidor no de época".

Parameters:

partitionId - Identificador de la partición desde la que se van a leer los eventos.
startingPosition - Posición dentro de la partición del centro de eventos para empezar a consumir eventos.
receiveOptions - Opciones al recibir eventos de la partición.

Returns:

Secuencia de eventos para esta partición. Si se abrió una secuencia para los eventos antes, se devuelve la misma posición dentro de esa partición. De lo contrario, los eventos se leen a partir de startingPosition.

Se aplica a