Partager via


EventHubConsumerAsyncClient Classe

  • java.lang.Object
    • com.azure.messaging.eventhubs.EventHubConsumerAsyncClient

Implémente

public class EventHubConsumerAsyncClient
implements Closeable

Consommateur asynchrone responsable de la lecture EventData à partir d’une partition Event Hub spécifique ou de toutes les partitions dans le contexte d’un groupe de consommateurs spécifique.

Les exemples présentés dans ce document utilisent un objet d’informations d’identification nommé DefaultAzureCredential pour l’authentification, ce qui est approprié pour la plupart des scénarios, y compris les environnements de développement et de production locaux. En outre, nous vous recommandons d’utiliser l’identité managée pour l’authentification dans les environnements de production. Vous trouverez plus d’informations sur les différentes méthodes d’authentification et leurs types d’informations d’identification correspondants dans la documentation Azure Identity .

Exemple : création d’un EventHubConsumerAsyncClient

L’exemple de code suivant illustre la création du client EventHubConsumerAsyncClientasynchrone . fullyQualifiedNamespace est le nom d’hôte de l’espace de noms Event Hubs. Il est répertorié sous le volet « Essentials » après avoir accédé à l’espace de noms Event Hubs via le portail Azure. consumerGroup Le est trouvé en accédant à l’instance Event Hub, puis en sélectionnant « Groupes de consommateurs » sous le panneau « Entités ». consumerGroup(String consumerGroup) est requis pour créer des clients consommateurs. Les informations d’identification utilisées sont DefaultAzureCredential dues au fait qu’elles combinent les informations d’identification couramment utilisées dans le déploiement et le développement, et qu’elles choisissent les informations d’identification à utiliser en fonction de leur environnement en cours d’exécution.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventHubProducerAsyncClient producer = new EventHubClientBuilder()
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .buildAsyncProducerClient();

Exemple : Consommation d’événements d’une partition unique à partir d’Event Hub

L’exemple de code ci-dessous illustre la réception d’événements à partir de la partition « 0 » d’un hub d’événements à partir de latest(). latest() pointe vers la fin du flux de partition. Le consommateur reçoit les événements en file d’attente après avoir commencé à s’abonner aux événements.

receiveFromPartition(String partitionId, EventPosition startingPosition) est un appel non bloquant. Après avoir configuré l’opération, sa représentation asynchrone est retournée. Le Flux doit être abonné, comme l’exemple ci-dessous, pour commencer à recevoir des événements.

EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         new DefaultAzureCredentialBuilder().build())
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .buildAsyncConsumerClient();

 // Obtain partitionId from EventHubConsumerAsyncClient.getPartitionIds()
 String partitionId = "0";
 EventPosition startingPosition = EventPosition.latest();

 // Keep a reference to `subscription`. When the program is finished receiving events, call
 // subscription.dispose(). This will stop fetching events from the Event Hub.
 //
 // NOTE: This is a non-blocking call and will move to the next line of code after setting up the async
 // operation.  If the program ends after this, or the class is immediately disposed, no events will be
 // received.
 Disposable subscription = consumer.receiveFromPartition(partitionId, startingPosition)
     .subscribe(partitionEvent -> {
         PartitionContext partitionContext = partitionEvent.getPartitionContext();
         EventData event = partitionEvent.getData();

         System.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());
         System.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString());
     }, error -> {
         // This is a terminal signal.  No more events will be received from the same Flux object.
         System.err.print("An error occurred:" + error);
     }, () -> {
         // This is a terminal signal.  No more events will be received from the same Flux object.
         System.out.print("Stream has ended.");
     });

Exemple : Inclure les dernières informations de partition dans les événements reçus

EventData peut être décoré avec les dernières informations de partition et envoyé aux consommateurs. Activez cette option en définissant sur setTrackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties)true. Au fur et à mesure que des événements entrent, explorez l’objet PartitionEvent . Cela est utile dans les scénarios où les clients souhaitent obtenir des informations à jour constantes sur leur Hub d’événements. Cela prend un impact sur les performances, car les informations de partition supplémentaires doivent être envoyées sur le câble à chaque événement.

receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions) est un appel non bloquant. Après avoir configuré l’opération, sa représentation asynchrone est retournée. Le Flux doit être abonné, comme l’exemple ci-dessous, pour commencer à recevoir des événements.

// Set `setTrackLastEnqueuedEventProperties` to true to get the last enqueued information from the partition for
 // each event that is received.
 ReceiveOptions receiveOptions = new ReceiveOptions()
     .setTrackLastEnqueuedEventProperties(true);
 EventPosition startingPosition = EventPosition.earliest();

 // Receives events from partition "0" starting at the beginning of the stream.
 // Keep a reference to `subscription`. When the program is finished receiving events, call
 // subscription.dispose(). This will stop fetching events from the Event Hub.
 Disposable subscription = consumer.receiveFromPartition("0", startingPosition, receiveOptions)
     .subscribe(partitionEvent -> {
         LastEnqueuedEventProperties properties = partitionEvent.getLastEnqueuedEventProperties();
         System.out.printf("Information received at %s. Last enqueued sequence number: %s%n",
             properties.getRetrievalTime(),
             properties.getSequenceNumber());
     });

Exemple : Limitation de la consommation des événements à partir d’Event Hub

Pour les consommateurs d’événements qui doivent limiter le nombre d’événements qu’ils reçoivent à un moment donné, ils peuvent utiliser BaseSubscriber#request(long). L’utilisation d’un abonné personnalisé permet aux développeurs de contrôler plus précisément la fréquence à laquelle ils reçoivent les événements.

receiveFromPartition(String partitionId, EventPosition startingPosition) est un appel non bloquant. Après avoir configuré l’opération, sa représentation asynchrone est retournée. Le Flux doit être abonné, comme l’exemple ci-dessous, pour commencer à recevoir des événements.

consumer.receiveFromPartition(partitionId, EventPosition.latest()).subscribe(new BaseSubscriber<PartitionEvent>() {
     private static final int NUMBER_OF_EVENTS = 5;
     private final AtomicInteger currentNumberOfEvents = new AtomicInteger();

     @Override
     protected void hookOnSubscribe(Subscription subscription) {
         // Tell the Publisher we only want 5 events at a time.
         request(NUMBER_OF_EVENTS);
     }

     @Override
     protected void hookOnNext(PartitionEvent value) {
         // Process the EventData

         // If the number of events we have currently received is a multiple of 5, that means we have reached the
         // last event the Publisher will provide to us. Invoking request(long) here, tells the Publisher that
         // the subscriber is ready to get more events from upstream.
         if (currentNumberOfEvents.incrementAndGet() % 5 == 0) {
             request(NUMBER_OF_EVENTS);
         }
     }
 });

Exemple : réception de toutes les partitions

L’exemple de code ci-dessous illustre la réception d’événements de toutes les partitions d’un Event Hub à partir du début du flux de chaque partition. Cela est utile à des fins de démonstration, mais n’est pas destiné aux scénarios de production. Pour les scénarios de production, envisagez d’utiliser EventProcessorClient.

receive(boolean startReadingAtEarliestEvent) est un appel non bloquant. Après avoir configuré l’opération, sa représentation asynchrone est retournée. Le Flux doit être abonné, comme l’exemple ci-dessous, pour commencer à recevoir des événements.

// Keep a reference to `subscription`. When the program is finished receiving events, call
 // subscription.dispose(). This will stop fetching events from the Event Hub.
 Disposable subscription = consumer.receive(true)
     .subscribe(partitionEvent -> {
         PartitionContext context = partitionEvent.getPartitionContext();
         EventData event = partitionEvent.getData();

         System.out.printf("Event %s is from partition %s%n.", event.getSequenceNumber(),
             context.getPartitionId());
     }, error -> {
         // This is a terminal signal.  No more events will be received from the same Flux object.
         System.err.print("An error occurred:" + error);
     }, () -> {
         // This is a terminal signal.  No more events will be received from the same Flux object.
         System.out.print("Stream has ended.");
     });

Résumé de la méthode

Modificateur et type Méthode et description
void close()

Supprime le consommateur en fermant la connexion sous-jacente au service.

String getConsumerGroup()

Obtient le groupe de consommateurs dont ce consommateur lit les événements dans le cadre de.

String getEventHubName()

Obtient le nom d’Event Hub avec lequel ce client interagit.

Mono<EventHubProperties> getEventHubProperties()

Récupère des informations sur un hub d’événements, notamment le nombre de partitions présentes et leurs identificateurs.

String getFullyQualifiedNamespace()

Obtient l’espace de noms Event Hubs complet auquel la connexion est associée.

String getIdentifier()

Obtient l’identificateur du client.

Flux<String> getPartitionIds()

Récupère les identificateurs des partitions d’un Event Hub.

Mono<PartitionProperties> getPartitionProperties(String partitionId)

Récupère des informations sur une partition spécifique pour un Event Hub, y compris des éléments qui décrivent les événements disponibles dans le flux d’événements de partition.

Flux<PartitionEvent> receive()

Consomme les événements de toutes les partitions à partir du début de chaque partition.

Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent)

Consomme les événements de toutes les partitions.

Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions)

Consomme les événements de toutes les partitions configurées avec un ensemble de receiveOptions.

Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPosition startingPosition)

Consomme les événements d’une partition unique à partir de startingPosition.

Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions)

Consomme des événements à partir d’une partition unique en commençant par startingPosition un ensemble de ReceiveOptions.

Méthodes héritées de java.lang.Object

Détails de la méthode

close

public void close()

Supprime le consommateur en fermant la connexion sous-jacente au service.

getConsumerGroup

public String getConsumerGroup()

Obtient le groupe de consommateurs dont ce consommateur lit les événements dans le cadre de.

Returns:

Le groupe de consommateurs dont ce consommateur lit les événements dans le cadre de.

getEventHubName

public String getEventHubName()

Obtient le nom d’Event Hub avec lequel ce client interagit.

Returns:

Nom event Hub avec lequel ce client interagit.

getEventHubProperties

public Mono getEventHubProperties()

Récupère des informations sur un hub d’événements, notamment le nombre de partitions présentes et leurs identificateurs.

Returns:

Ensemble d’informations pour le hub d’événements auquel ce client est associé.

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

Obtient l’espace de noms Event Hubs complet auquel la connexion est associée. Cela est probablement similaire à {yournamespace}.servicebus.windows.net.

Returns:

Espace de noms Event Hubs complet auquel la connexion est associée

getIdentifier

public String getIdentifier()

Obtient l’identificateur du client.

Returns:

Chaîne d’identificateur unique pour le client actuel.

getPartitionIds

public Flux getPartitionIds()

Récupère les identificateurs des partitions d’un Event Hub.

Returns:

Flux d’identificateurs pour les partitions d’un Event Hub.

getPartitionProperties

public Mono getPartitionProperties(String partitionId)

Récupère des informations sur une partition spécifique pour un Event Hub, y compris des éléments qui décrivent les événements disponibles dans le flux d’événements de partition.

Parameters:

partitionId - Identificateur unique d’une partition associée à Event Hub.

Returns:

Ensemble d’informations pour la partition demandée sous le hub d’événements à laquelle ce client est associé.

receive

public Flux receive()

Consomme les événements de toutes les partitions à partir du début de chaque partition.

Cette méthode n’est pas recommandée pour une utilisation en production ; doit EventProcessorClient être utilisé pour lire les événements de toutes les partitions dans un scénario de production, car il offre une expérience beaucoup plus robuste avec un débit plus élevé. Il est important de noter que cette méthode ne garantit pas l’équité entre les partitions. Selon la communication du service, il peut y avoir un clustering d’événements par partition et/ou il peut y avoir un biais notable pour une partition ou un sous-ensemble de partitions donné.

Returns:

Flux d’événements pour chaque partition dans Event Hub à partir du début de chaque partition.

receive

public Flux receive(boolean startReadingAtEarliestEvent)

Consomme les événements de toutes les partitions.

Cette méthode n’est pas recommandée pour une utilisation en production ; doit EventProcessorClient être utilisé pour lire les événements de toutes les partitions dans un scénario de production, car il offre une expérience beaucoup plus robuste avec un débit plus élevé. Il est important de noter que cette méthode ne garantit pas l’équité entre les partitions. Selon la communication du service, il peut y avoir un clustering d’événements par partition et/ou il peut y avoir un biais notable pour une partition ou un sous-ensemble de partitions donné.

Parameters:

startReadingAtEarliestEvent - true pour commencer la lecture aux premiers événements disponibles dans chaque partition ; sinon, la lecture commence à la fin de chaque partition et voit uniquement les nouveaux événements à mesure qu’ils sont publiés.

Returns:

Flux d’événements pour chaque partition dans Event Hub.

receive

public Flux receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions)

Consomme les événements de toutes les partitions configurées avec un ensemble de receiveOptions.

Cette méthode n’est pas recommandée pour une utilisation en production ; doit EventProcessorClient être utilisé pour lire les événements de toutes les partitions dans un scénario de production, car il offre une expérience beaucoup plus robuste avec un débit plus élevé. Il est important de noter que cette méthode ne garantit pas l’équité entre les partitions. Selon la communication du service, il peut y avoir un clustering d’événements par partition et/ou il peut y avoir un biais notable pour une partition ou un sous-ensemble de partitions donné.

  • Si la réception est appelée où getOwnerLevel() a une valeur, le service Event Hubs garantit qu’il n’existe qu’un seul consommateur actif par combinaison partitionId et groupe de consommateurs. Cette opération de réception est parfois appelée « consommateur d’époque ».
  • Vous pouvez créer plusieurs consommateurs par partitionId et une combinaison de groupes de consommateurs en ne définissant getOwnerLevel() pas lors de l’appel des opérations de réception. Ce consommateur non exclusif est parfois appelé « consommateur non-d’époque ».

Parameters:

startReadingAtEarliestEvent - true pour commencer la lecture aux premiers événements disponibles dans chaque partition ; sinon, la lecture commence à la fin de chaque partition et voit uniquement les nouveaux événements à mesure qu’ils sont publiés.
receiveOptions - Options lors de la réception d’événements de chaque partition Event Hub.

Returns:

Flux d’événements pour chaque partition dans Event Hub.

receiveFromPartition

public Flux receiveFromPartition(String partitionId, EventPosition startingPosition)

Consomme les événements d’une partition unique à partir de startingPosition.

Parameters:

partitionId - Identificateur de la partition à partir de laquelle lire les événements.
startingPosition - Position dans la partition Event Hub pour commencer à consommer des événements.

Returns:

Flux d’événements pour cette partition à partir de startingPosition.

receiveFromPartition

public Flux receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions)

Consomme des événements à partir d’une partition unique en commençant par startingPosition un ensemble de ReceiveOptions.

  • Si la réception est appelée où getOwnerLevel() a une valeur, le service Event Hubs garantit qu’il n’existe qu’un seul consommateur actif par combinaison partitionId et groupe de consommateurs. Cette opération de réception est parfois appelée « consommateur d’époque ».
  • Vous pouvez créer plusieurs consommateurs par partitionId et une combinaison de groupes de consommateurs en ne définissant getOwnerLevel() pas lors de l’appel des opérations de réception. Ce consommateur non exclusif est parfois appelé « consommateur non-d’époque ».

Parameters:

partitionId - Identificateur de la partition à partir de laquelle lire les événements.
startingPosition - Position dans la partition Event Hub pour commencer à consommer des événements.
receiveOptions - Options lors de la réception d’événements de la partition.

Returns:

Flux d’événements pour cette partition. Si un flux pour les événements a été ouvert auparavant, la même position au sein de cette partition est retournée. Sinon, les événements sont lus à partir de startingPosition.

S’applique à