EventHubConsumerAsyncClient Clase
- java.
lang. Object - com.
azure. messaging. eventhubs. EventHubConsumerAsyncClient
- com.
Implementaciones
public class EventHubConsumerAsyncClient
implements Closeable
Un consumidor asincrónico responsable de leer EventData desde una partición específica del centro de eventos o todas las particiones en el contexto de un grupo de consumidores específico.
Los ejemplos que se muestran en este documento usan un objeto de credencial denominado DefaultAzureCredential para la autenticación, que es adecuado para la mayoría de los escenarios, incluidos los entornos de desarrollo y producción locales. Además, se recomienda usar la identidad administrada para la autenticación en entornos de producción. Puede encontrar más información sobre las distintas formas de autenticación y sus tipos de credenciales correspondientes en la documentación de Azure Identity.
Ejemplo: Creación de un EventHubConsumerAsyncClient
En el ejemplo de código siguiente se muestra la creación del cliente EventHubConsumerAsyncClientasincrónico . fullyQualifiedNamespace
es el nombre de host del espacio de nombres de Event Hubs. Aparece en el panel "Essentials" después de navegar al espacio de nombres de Event Hubs a través de Azure Portal. Para consumerGroup
encontrar , vaya a la instancia del centro de eventos y seleccione "Grupos de consumidores" en el panel "Entidades". consumerGroup(String consumerGroup) es necesario para crear clientes de consumidor. La credencial usada se debe DefaultAzureCredential
a que combina credenciales usadas habitualmente en la implementación y el desarrollo y elige la credencial que se usará en función de su entorno en ejecución.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerAsyncClient producer = new EventHubClientBuilder()
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.buildAsyncProducerClient();
Ejemplo: Consumo de eventos de una sola partición desde el centro de eventos
En el ejemplo de código siguiente se muestra cómo recibir eventos de la partición "0" de un centro de eventos a partir de latest(). latest() apunta al final del flujo de partición. El consumidor recibe eventos puestos en cola después de empezar a suscribirse a eventos.
receiveFromPartition(String partitionId, EventPosition startingPosition) es una llamada sin bloqueo. Después de configurar la operación, se devuelve su representación asincrónica. Flux
Debe suscribirse a , como en el ejemplo siguiente, para empezar a recibir eventos.
EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
new DefaultAzureCredentialBuilder().build())
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildAsyncConsumerClient();
// Obtain partitionId from EventHubConsumerAsyncClient.getPartitionIds()
String partitionId = "0";
EventPosition startingPosition = EventPosition.latest();
// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
//
// NOTE: This is a non-blocking call and will move to the next line of code after setting up the async
// operation. If the program ends after this, or the class is immediately disposed, no events will be
// received.
Disposable subscription = consumer.receiveFromPartition(partitionId, startingPosition)
.subscribe(partitionEvent -> {
PartitionContext partitionContext = partitionEvent.getPartitionContext();
EventData event = partitionEvent.getData();
System.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());
System.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString());
}, error -> {
// This is a terminal signal. No more events will be received from the same Flux object.
System.err.print("An error occurred:" + error);
}, () -> {
// This is a terminal signal. No more events will be received from the same Flux object.
System.out.print("Stream has ended.");
});
Ejemplo: Incluir la información de partición más reciente en eventos recibidos
EventData se puede decorar con la información de partición más reciente y enviarse a los consumidores. Habilite esto estableciendo setTrackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties) en true
. A medida que entran los eventos, explore el PartitionEvent objeto . Esto es útil en escenarios en los que los clientes quieren mantener información actualizada constante sobre su centro de eventos. Esto tiene un impacto de rendimiento, ya que la información de partición adicional debe enviarse a través de la conexión con cada evento.
receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions) es una llamada sin bloqueo. Después de configurar la operación, se devuelve su representación asincrónica. Flux
Debe suscribirse a, como se muestra a continuación, para empezar a recibir eventos.
// Set `setTrackLastEnqueuedEventProperties` to true to get the last enqueued information from the partition for
// each event that is received.
ReceiveOptions receiveOptions = new ReceiveOptions()
.setTrackLastEnqueuedEventProperties(true);
EventPosition startingPosition = EventPosition.earliest();
// Receives events from partition "0" starting at the beginning of the stream.
// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
Disposable subscription = consumer.receiveFromPartition("0", startingPosition, receiveOptions)
.subscribe(partitionEvent -> {
LastEnqueuedEventProperties properties = partitionEvent.getLastEnqueuedEventProperties();
System.out.printf("Information received at %s. Last enqueued sequence number: %s%n",
properties.getRetrievalTime(),
properties.getSequenceNumber());
});
Ejemplo: Limitación de velocidad del consumo de eventos del centro de eventos
Para los consumidores de eventos que necesitan limitar el número de eventos que reciben en un momento dado, pueden usar BaseSubscriber#request(long). El uso de un suscriptor personalizado permite a los desarrolladores controlar más pormenorizado la velocidad a la que reciben eventos.
receiveFromPartition(String partitionId, EventPosition startingPosition) es una llamada sin bloqueo. Después de configurar la operación, se devuelve su representación asincrónica. Flux
Debe suscribirse a , como en el ejemplo siguiente, para empezar a recibir eventos.
consumer.receiveFromPartition(partitionId, EventPosition.latest()).subscribe(new BaseSubscriber<PartitionEvent>() {
private static final int NUMBER_OF_EVENTS = 5;
private final AtomicInteger currentNumberOfEvents = new AtomicInteger();
@Override
protected void hookOnSubscribe(Subscription subscription) {
// Tell the Publisher we only want 5 events at a time.
request(NUMBER_OF_EVENTS);
}
@Override
protected void hookOnNext(PartitionEvent value) {
// Process the EventData
// If the number of events we have currently received is a multiple of 5, that means we have reached the
// last event the Publisher will provide to us. Invoking request(long) here, tells the Publisher that
// the subscriber is ready to get more events from upstream.
if (currentNumberOfEvents.incrementAndGet() % 5 == 0) {
request(NUMBER_OF_EVENTS);
}
}
});
Ejemplo: Recepción de todas las particiones
En el ejemplo de código siguiente se muestra cómo recibir eventos de todas las particiones de un centro de eventos a partir del principio de la secuencia de cada partición. Esto es útil para fines de demostración, pero no está pensado para escenarios de producción. Para escenarios de producción, considere la posibilidad de usar EventProcessorClient.
receive(boolean startReadingAtEarliestEvent) es una llamada sin bloqueo. Después de configurar la operación, se devuelve su representación asincrónica. Flux
Debe suscribirse a , como en el ejemplo siguiente, para empezar a recibir eventos.
// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
Disposable subscription = consumer.receive(true)
.subscribe(partitionEvent -> {
PartitionContext context = partitionEvent.getPartitionContext();
EventData event = partitionEvent.getData();
System.out.printf("Event %s is from partition %s%n.", event.getSequenceNumber(),
context.getPartitionId());
}, error -> {
// This is a terminal signal. No more events will be received from the same Flux object.
System.err.print("An error occurred:" + error);
}, () -> {
// This is a terminal signal. No more events will be received from the same Flux object.
System.out.print("Stream has ended.");
});
Resumen del método
Modificador y tipo | Método y descripción |
---|---|
void |
close()
Elimina al consumidor cerrando la conexión subyacente al servicio. |
String |
getConsumerGroup()
Obtiene el grupo de consumidores del que este consumidor lee los eventos como parte de . |
String |
getEventHubName()
Obtiene el nombre del centro de eventos con el que interactúa este cliente. |
Mono<Event |
getEventHubProperties()
Recupera información sobre un centro de eventos, incluido el número de particiones presentes y sus identificadores. |
String |
getFullyQualifiedNamespace()
Obtiene el espacio de nombres completo de Event Hubs al que está asociada la conexión. |
String |
getIdentifier()
Obtiene el identificador de cliente. |
Flux<String> |
getPartitionIds()
Recupera los identificadores de las particiones de un centro de eventos. |
Mono<Partition |
getPartitionProperties(String partitionId)
Recupera información sobre una partición específica para un centro de eventos, incluidos los elementos que describen los eventos disponibles en el flujo de eventos de partición. |
Flux<Partition |
receive()
Consume eventos de todas las particiones a partir del principio de cada partición. |
Flux<Partition |
receive(boolean startReadingAtEarliestEvent)
Consume eventos de todas las particiones. |
Flux<Partition |
receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions)
Consume eventos de todas las particiones configuradas con un conjunto de |
Flux<Partition |
receiveFromPartition(String partitionId, EventPosition startingPosition)
Consume eventos de una sola partición a partir de |
Flux<Partition |
receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions)
Consume eventos de una sola partición a partir |
Métodos heredados de java.lang.Object
Detalles del método
close
public void close()
Elimina al consumidor cerrando la conexión subyacente al servicio.
getConsumerGroup
public String getConsumerGroup()
Obtiene el grupo de consumidores del que este consumidor lee los eventos como parte de .
Returns:
getEventHubName
public String getEventHubName()
Obtiene el nombre del centro de eventos con el que interactúa este cliente.
Returns:
getEventHubProperties
public Mono
Recupera información sobre un centro de eventos, incluido el número de particiones presentes y sus identificadores.
Returns:
getFullyQualifiedNamespace
public String getFullyQualifiedNamespace()
Obtiene el espacio de nombres completo de Event Hubs al que está asociada la conexión. Es probable que sea similar a {yournamespace}.servicebus.windows.net
.
Returns:
getIdentifier
public String getIdentifier()
Obtiene el identificador de cliente.
Returns:
getPartitionIds
public Flux
Recupera los identificadores de las particiones de un centro de eventos.
Returns:
getPartitionProperties
public Mono
Recupera información sobre una partición específica para un centro de eventos, incluidos los elementos que describen los eventos disponibles en el flujo de eventos de partición.
Parameters:
Returns:
receive
public Flux
Consume eventos de todas las particiones a partir del principio de cada partición.
Este método no se recomienda para su uso en producción; EventProcessorClient se debe usar para leer eventos de todas las particiones de un escenario de producción, ya que ofrece una experiencia mucho más sólida con un mayor rendimiento. Es importante tener en cuenta que este método no garantiza la equidad entre las particiones. En función de la comunicación del servicio, puede haber una agrupación en clústeres de eventos por partición o puede haber un sesgo notable para una partición determinada o un subconjunto de particiones.
Returns:
receive
public Flux
Consume eventos de todas las particiones.
Este método no se recomienda para su uso en producción; EventProcessorClient se debe usar para leer eventos de todas las particiones en un escenario de producción, ya que ofrece una experiencia mucho más sólida con un mayor rendimiento. Es importante tener en cuenta que este método no garantiza la equidad entre las particiones. En función de la comunicación del servicio, puede haber una agrupación en clústeres de eventos por partición o puede haber un sesgo notable para una partición determinada o un subconjunto de particiones.
Parameters:
true
para empezar a leer en los primeros eventos disponibles en cada partición; De lo contrario, la lectura comenzará al final de cada partición viendo solo los nuevos eventos a medida que se publican.
Returns:
receive
public Flux
Consume eventos de todas las particiones configuradas con un conjunto de receiveOptions
.
Este método no se recomienda para su uso en producción; EventProcessorClient se debe usar para leer eventos de todas las particiones en un escenario de producción, ya que ofrece una experiencia mucho más sólida con un mayor rendimiento. Es importante tener en cuenta que este método no garantiza la equidad entre las particiones. En función de la comunicación del servicio, puede haber una agrupación en clústeres de eventos por partición o puede haber un sesgo notable para una partición determinada o un subconjunto de particiones.
- Si receive se invoca donde getOwnerLevel() tiene un valor, el servicio Event Hubs garantizará que solo exista un consumidor activo por partitionId y por combinación de grupo de consumidores. Esta operación de recepción se conoce a veces como "Consumidor de época".
- Se pueden crear varios consumidores por partitionId y combinación de grupo de consumidores al invocar getOwnerLevel() operaciones de recepción. A veces, este consumidor no exclusivo se conoce como "Consumidor no de época".
Parameters:
true
para empezar a leer en los primeros eventos disponibles en cada partición; De lo contrario, la lectura comenzará al final de cada partición viendo solo los nuevos eventos a medida que se publican.
Returns:
receiveFromPartition
public Flux
Consume eventos de una sola partición a partir de startingPosition
.
Parameters:
Returns:
startingPosition
.receiveFromPartition
public Flux
Consume eventos de una sola partición a partir startingPosition
de con un conjunto de ReceiveOptions.
- Si receive se invoca donde getOwnerLevel() tiene un valor, el servicio Event Hubs garantizará que solo exista un consumidor activo por partitionId y por combinación de grupo de consumidores. Esta operación de recepción se conoce a veces como "Consumidor de época".
- Se pueden crear varios consumidores por partitionId y combinación de grupo de consumidores al invocar getOwnerLevel() operaciones de recepción. A veces, este consumidor no exclusivo se conoce como "Consumidor no de época".
Parameters:
Returns:
startingPosition
.Se aplica a
Azure SDK for Java