EventHubConsumerAsyncClient Classe
- java.
lang. Object - com.
azure. messaging. eventhubs. EventHubConsumerAsyncClient
- com.
Implémente
public class EventHubConsumerAsyncClient
implements Closeable
Consommateur asynchrone responsable de la lecture EventData à partir d’une partition Event Hub spécifique ou de toutes les partitions dans le contexte d’un groupe de consommateurs spécifique.
Les exemples présentés dans ce document utilisent un objet d’informations d’identification nommé DefaultAzureCredential pour l’authentification, ce qui est approprié pour la plupart des scénarios, y compris les environnements de développement et de production locaux. En outre, nous vous recommandons d’utiliser l’identité managée pour l’authentification dans les environnements de production. Vous trouverez plus d’informations sur les différentes méthodes d’authentification et leurs types d’informations d’identification correspondants dans la documentation Azure Identity .
Exemple : création d’un EventHubConsumerAsyncClient
L’exemple de code suivant illustre la création du client EventHubConsumerAsyncClientasynchrone . fullyQualifiedNamespace
est le nom d’hôte de l’espace de noms Event Hubs. Il est répertorié sous le volet « Essentials » après avoir accédé à l’espace de noms Event Hubs via le portail Azure. consumerGroup
Le est trouvé en accédant à l’instance Event Hub, puis en sélectionnant « Groupes de consommateurs » sous le panneau « Entités ». consumerGroup(String consumerGroup) est requis pour créer des clients consommateurs. Les informations d’identification utilisées sont DefaultAzureCredential
dues au fait qu’elles combinent les informations d’identification couramment utilisées dans le déploiement et le développement, et qu’elles choisissent les informations d’identification à utiliser en fonction de leur environnement en cours d’exécution.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerAsyncClient producer = new EventHubClientBuilder()
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.buildAsyncProducerClient();
Exemple : Consommation d’événements d’une partition unique à partir d’Event Hub
L’exemple de code ci-dessous illustre la réception d’événements à partir de la partition « 0 » d’un hub d’événements à partir de latest(). latest() pointe vers la fin du flux de partition. Le consommateur reçoit les événements en file d’attente après avoir commencé à s’abonner aux événements.
receiveFromPartition(String partitionId, EventPosition startingPosition) est un appel non bloquant. Après avoir configuré l’opération, sa représentation asynchrone est retournée. Le Flux
doit être abonné, comme l’exemple ci-dessous, pour commencer à recevoir des événements.
EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
new DefaultAzureCredentialBuilder().build())
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildAsyncConsumerClient();
// Obtain partitionId from EventHubConsumerAsyncClient.getPartitionIds()
String partitionId = "0";
EventPosition startingPosition = EventPosition.latest();
// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
//
// NOTE: This is a non-blocking call and will move to the next line of code after setting up the async
// operation. If the program ends after this, or the class is immediately disposed, no events will be
// received.
Disposable subscription = consumer.receiveFromPartition(partitionId, startingPosition)
.subscribe(partitionEvent -> {
PartitionContext partitionContext = partitionEvent.getPartitionContext();
EventData event = partitionEvent.getData();
System.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());
System.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString());
}, error -> {
// This is a terminal signal. No more events will be received from the same Flux object.
System.err.print("An error occurred:" + error);
}, () -> {
// This is a terminal signal. No more events will be received from the same Flux object.
System.out.print("Stream has ended.");
});
Exemple : Inclure les dernières informations de partition dans les événements reçus
EventData peut être décoré avec les dernières informations de partition et envoyé aux consommateurs. Activez cette option en définissant sur setTrackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties)true
. Au fur et à mesure que des événements entrent, explorez l’objet PartitionEvent . Cela est utile dans les scénarios où les clients souhaitent obtenir des informations à jour constantes sur leur Hub d’événements. Cela prend un impact sur les performances, car les informations de partition supplémentaires doivent être envoyées sur le câble à chaque événement.
receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions) est un appel non bloquant. Après avoir configuré l’opération, sa représentation asynchrone est retournée. Le Flux
doit être abonné, comme l’exemple ci-dessous, pour commencer à recevoir des événements.
// Set `setTrackLastEnqueuedEventProperties` to true to get the last enqueued information from the partition for
// each event that is received.
ReceiveOptions receiveOptions = new ReceiveOptions()
.setTrackLastEnqueuedEventProperties(true);
EventPosition startingPosition = EventPosition.earliest();
// Receives events from partition "0" starting at the beginning of the stream.
// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
Disposable subscription = consumer.receiveFromPartition("0", startingPosition, receiveOptions)
.subscribe(partitionEvent -> {
LastEnqueuedEventProperties properties = partitionEvent.getLastEnqueuedEventProperties();
System.out.printf("Information received at %s. Last enqueued sequence number: %s%n",
properties.getRetrievalTime(),
properties.getSequenceNumber());
});
Exemple : Limitation de la consommation des événements à partir d’Event Hub
Pour les consommateurs d’événements qui doivent limiter le nombre d’événements qu’ils reçoivent à un moment donné, ils peuvent utiliser BaseSubscriber#request(long). L’utilisation d’un abonné personnalisé permet aux développeurs de contrôler plus précisément la fréquence à laquelle ils reçoivent les événements.
receiveFromPartition(String partitionId, EventPosition startingPosition) est un appel non bloquant. Après avoir configuré l’opération, sa représentation asynchrone est retournée. Le Flux
doit être abonné, comme l’exemple ci-dessous, pour commencer à recevoir des événements.
consumer.receiveFromPartition(partitionId, EventPosition.latest()).subscribe(new BaseSubscriber<PartitionEvent>() {
private static final int NUMBER_OF_EVENTS = 5;
private final AtomicInteger currentNumberOfEvents = new AtomicInteger();
@Override
protected void hookOnSubscribe(Subscription subscription) {
// Tell the Publisher we only want 5 events at a time.
request(NUMBER_OF_EVENTS);
}
@Override
protected void hookOnNext(PartitionEvent value) {
// Process the EventData
// If the number of events we have currently received is a multiple of 5, that means we have reached the
// last event the Publisher will provide to us. Invoking request(long) here, tells the Publisher that
// the subscriber is ready to get more events from upstream.
if (currentNumberOfEvents.incrementAndGet() % 5 == 0) {
request(NUMBER_OF_EVENTS);
}
}
});
Exemple : réception de toutes les partitions
L’exemple de code ci-dessous illustre la réception d’événements de toutes les partitions d’un Event Hub à partir du début du flux de chaque partition. Cela est utile à des fins de démonstration, mais n’est pas destiné aux scénarios de production. Pour les scénarios de production, envisagez d’utiliser EventProcessorClient.
receive(boolean startReadingAtEarliestEvent) est un appel non bloquant. Après avoir configuré l’opération, sa représentation asynchrone est retournée. Le Flux
doit être abonné, comme l’exemple ci-dessous, pour commencer à recevoir des événements.
// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
Disposable subscription = consumer.receive(true)
.subscribe(partitionEvent -> {
PartitionContext context = partitionEvent.getPartitionContext();
EventData event = partitionEvent.getData();
System.out.printf("Event %s is from partition %s%n.", event.getSequenceNumber(),
context.getPartitionId());
}, error -> {
// This is a terminal signal. No more events will be received from the same Flux object.
System.err.print("An error occurred:" + error);
}, () -> {
// This is a terminal signal. No more events will be received from the same Flux object.
System.out.print("Stream has ended.");
});
Résumé de la méthode
Modificateur et type | Méthode et description |
---|---|
void |
close()
Supprime le consommateur en fermant la connexion sous-jacente au service. |
String |
getConsumerGroup()
Obtient le groupe de consommateurs dont ce consommateur lit les événements dans le cadre de. |
String |
getEventHubName()
Obtient le nom d’Event Hub avec lequel ce client interagit. |
Mono<Event |
getEventHubProperties()
Récupère des informations sur un hub d’événements, notamment le nombre de partitions présentes et leurs identificateurs. |
String |
getFullyQualifiedNamespace()
Obtient l’espace de noms Event Hubs complet auquel la connexion est associée. |
String |
getIdentifier()
Obtient l’identificateur du client. |
Flux<String> |
getPartitionIds()
Récupère les identificateurs des partitions d’un Event Hub. |
Mono<Partition |
getPartitionProperties(String partitionId)
Récupère des informations sur une partition spécifique pour un Event Hub, y compris des éléments qui décrivent les événements disponibles dans le flux d’événements de partition. |
Flux<Partition |
receive()
Consomme les événements de toutes les partitions à partir du début de chaque partition. |
Flux<Partition |
receive(boolean startReadingAtEarliestEvent)
Consomme les événements de toutes les partitions. |
Flux<Partition |
receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions)
Consomme les événements de toutes les partitions configurées avec un ensemble de |
Flux<Partition |
receiveFromPartition(String partitionId, EventPosition startingPosition)
Consomme les événements d’une partition unique à partir de |
Flux<Partition |
receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions)
Consomme des événements à partir d’une partition unique en commençant par |
Méthodes héritées de java.lang.Object
Détails de la méthode
close
public void close()
Supprime le consommateur en fermant la connexion sous-jacente au service.
getConsumerGroup
public String getConsumerGroup()
Obtient le groupe de consommateurs dont ce consommateur lit les événements dans le cadre de.
Returns:
getEventHubName
public String getEventHubName()
Obtient le nom d’Event Hub avec lequel ce client interagit.
Returns:
getEventHubProperties
public Mono
Récupère des informations sur un hub d’événements, notamment le nombre de partitions présentes et leurs identificateurs.
Returns:
getFullyQualifiedNamespace
public String getFullyQualifiedNamespace()
Obtient l’espace de noms Event Hubs complet auquel la connexion est associée. Cela est probablement similaire à {yournamespace}.servicebus.windows.net
.
Returns:
getIdentifier
public String getIdentifier()
Obtient l’identificateur du client.
Returns:
getPartitionIds
public Flux
Récupère les identificateurs des partitions d’un Event Hub.
Returns:
getPartitionProperties
public Mono
Récupère des informations sur une partition spécifique pour un Event Hub, y compris des éléments qui décrivent les événements disponibles dans le flux d’événements de partition.
Parameters:
Returns:
receive
public Flux
Consomme les événements de toutes les partitions à partir du début de chaque partition.
Cette méthode n’est pas recommandée pour une utilisation en production ; doit EventProcessorClient être utilisé pour lire les événements de toutes les partitions dans un scénario de production, car il offre une expérience beaucoup plus robuste avec un débit plus élevé. Il est important de noter que cette méthode ne garantit pas l’équité entre les partitions. Selon la communication du service, il peut y avoir un clustering d’événements par partition et/ou il peut y avoir un biais notable pour une partition ou un sous-ensemble de partitions donné.
Returns:
receive
public Flux
Consomme les événements de toutes les partitions.
Cette méthode n’est pas recommandée pour une utilisation en production ; doit EventProcessorClient être utilisé pour lire les événements de toutes les partitions dans un scénario de production, car il offre une expérience beaucoup plus robuste avec un débit plus élevé. Il est important de noter que cette méthode ne garantit pas l’équité entre les partitions. Selon la communication du service, il peut y avoir un clustering d’événements par partition et/ou il peut y avoir un biais notable pour une partition ou un sous-ensemble de partitions donné.
Parameters:
true
pour commencer la lecture aux premiers événements disponibles dans chaque partition ; sinon, la lecture commence à la fin de chaque partition et voit uniquement les nouveaux événements à mesure qu’ils sont publiés.
Returns:
receive
public Flux
Consomme les événements de toutes les partitions configurées avec un ensemble de receiveOptions
.
Cette méthode n’est pas recommandée pour une utilisation en production ; doit EventProcessorClient être utilisé pour lire les événements de toutes les partitions dans un scénario de production, car il offre une expérience beaucoup plus robuste avec un débit plus élevé. Il est important de noter que cette méthode ne garantit pas l’équité entre les partitions. Selon la communication du service, il peut y avoir un clustering d’événements par partition et/ou il peut y avoir un biais notable pour une partition ou un sous-ensemble de partitions donné.
- Si la réception est appelée où getOwnerLevel() a une valeur, le service Event Hubs garantit qu’il n’existe qu’un seul consommateur actif par combinaison partitionId et groupe de consommateurs. Cette opération de réception est parfois appelée « consommateur d’époque ».
- Vous pouvez créer plusieurs consommateurs par partitionId et une combinaison de groupes de consommateurs en ne définissant getOwnerLevel() pas lors de l’appel des opérations de réception. Ce consommateur non exclusif est parfois appelé « consommateur non-d’époque ».
Parameters:
true
pour commencer la lecture aux premiers événements disponibles dans chaque partition ; sinon, la lecture commence à la fin de chaque partition et voit uniquement les nouveaux événements à mesure qu’ils sont publiés.
Returns:
receiveFromPartition
public Flux
Consomme les événements d’une partition unique à partir de startingPosition
.
Parameters:
Returns:
startingPosition
.receiveFromPartition
public Flux
Consomme des événements à partir d’une partition unique en commençant par startingPosition
un ensemble de ReceiveOptions.
- Si la réception est appelée où getOwnerLevel() a une valeur, le service Event Hubs garantit qu’il n’existe qu’un seul consommateur actif par combinaison partitionId et groupe de consommateurs. Cette opération de réception est parfois appelée « consommateur d’époque ».
- Vous pouvez créer plusieurs consommateurs par partitionId et une combinaison de groupes de consommateurs en ne définissant getOwnerLevel() pas lors de l’appel des opérations de réception. Ce consommateur non exclusif est parfois appelé « consommateur non-d’époque ».
Parameters:
Returns:
startingPosition
.S’applique à
Azure SDK for Java