Partager via


EventHubProducerAsyncClient Classe

  • java.lang.Object
    • com.azure.messaging.eventhubs.EventHubProducerAsyncClient

Implémente

public class EventHubProducerAsyncClient
implements Closeable

Producteur asynchrone chargé de EventData transmettre à un Event Hub spécifique, regroupé en lots. Selon le spécifié lors de la CreateBatchOptions création d’un EventDataBatch, les événements peuvent être automatiquement routés vers une partition disponible ou spécifiques à une partition. Informations supplémentaires et recommandations spécifiques pour les stratégies à utiliser lors de la publication d’événements se trouve dans : Distribuer des événements sur des partitions

Il est recommandé d’autoriser le routage automatique des partitions dans les cas suivants :

  • L’envoi d’événements doit être hautement disponible.
  • Les données d’événement doivent être réparties uniformément entre toutes les partitions disponibles.

Si aucun ID de partition n’est spécifié, les règles suivantes sont utilisées pour en sélectionner automatiquement un :

  1. Répartissez les événements de manière égale entre toutes les partitions disponibles à l’aide d’une approche de tourniquet (round robin).
  2. Si une partition devient indisponible, le service Event Hubs la détecte automatiquement et transfère le message vers une autre partition disponible.

Les exemples présentés dans ce document utilisent un objet d’informations d’identification nommé DefaultAzureCredential pour l’authentification, ce qui est approprié pour la plupart des scénarios, y compris les environnements de développement et de production locaux. En outre, nous vous recommandons d’utiliser l’identité managée pour l’authentification dans les environnements de production. Vous trouverez plus d’informations sur les différentes méthodes d’authentification et leurs types d’informations d’identification correspondants dans la documentation Azure Identity .

Exemple : Construire un EventHubProducerAsyncClient

L’exemple de code suivant illustre la création du client EventHubProducerAsyncClientasynchrone . fullyQualifiedNamespace est le nom d’hôte de l’espace de noms Event Hubs. Il est répertorié sous le volet « Essentials » après avoir accédé à l’espace de noms Event Hubs via le portail Azure.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventHubProducerAsyncClient producer = new EventHubClientBuilder()
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .buildAsyncProducerClient();

Exemple : Créer un producteur et publier des événements sur n’importe quelle partition

L’exemple de code suivant montre comment envoyer un ensemble d’événements à Event Hub. Les événements sont distribués via le routage automatique, car aucune option n’a été définie lors de la création du EventDataBatch via createBatch(). L’utilisation EventDataBatch est recommandée, car plusieurs événements peuvent être envoyés sur la connexion sous-jacente avec un seul message.

createBatch() et send(EventDataBatch batch) sont des appels non bloquants. Après avoir configuré l’opération, sa représentation asynchrone est retournée. Le Mono doit être abonné, comme l’exemple ci-dessous, pour commencer à publier le lot d’événements.

// Creating a batch without options set, will allow for automatic routing of events to any partition.
 producer.createBatch().flatMap(batch -> {
     batch.tryAdd(new EventData("test-event-1"));
     batch.tryAdd(new EventData("test-event-2"));
     return producer.send(batch);
 }).subscribe(unused -> {
 },
     error -> System.err.println("Error occurred while sending batch:" + error),
     () -> System.out.println("Send complete."));

Exemple : Publier des événements sur la partition « 1 »

L’exemple de code suivant montre comment envoyer un ensemble d’événements à Event Hub vers la partition « 1 ». EventDataBatch via createBatch(CreateBatchOptions options). Les identificateurs de partition peuvent être obtenus à l’aide de getPartitionIds(). L’utilisation EventDataBatch est recommandée, car plusieurs événements peuvent être envoyés sur la connexion sous-jacente avec un seul message.

createBatch(CreateBatchOptions options) et send(EventDataBatch batch) sont des appels non bloquants. Après avoir configuré l’opération, sa représentation asynchrone est retournée. Le Mono doit être abonné, comme l’exemple ci-dessous, pour commencer à publier le lot d’événements.

CreateBatchOptions options = new CreateBatchOptions().setPartitionId("1");
 producer.createBatch(options).flatMap(batch -> {
     batch.tryAdd(new EventData("test-event-1"));
     batch.tryAdd(new EventData("test-event-2"));
     return producer.send(batch);
 }).subscribe(unused -> {
 },
     error -> System.err.println("Error occurred while sending batch to partition 1:" + error),
     () -> System.out.println("Send to partition 1 complete."));

Exemple : Publier des événements sur la même partition, regroupés à l’aide de la clé de partition

Dans l’exemple de code ci-dessous, tous les événements avec la même clé de partition, « bread » sont envoyés à la même partition. Quand setPartitionId(String partitionId) est spécifié, il indique au service Event Hubs que ces événements appartiennent au même groupe et doivent appartenir à la même partition. Utile dans le cas où les développeurs veulent que les événements se terminent dans la même partition, mais ne se soucient pas de la partition dans laquelle ils se retrouvent.

createBatch(CreateBatchOptions options) et send(EventDataBatch batch) sont des appels non bloquants. Après avoir configuré l’opération, sa représentation asynchrone est retournée. Le Mono doit être abonné, comme l’exemple ci-dessous, pour commencer à publier le lot d’événements.

CreateBatchOptions options = new CreateBatchOptions().setPartitionKey("bread");

 producer.createBatch(options).flatMap(batch -> {
     batch.tryAdd(new EventData("sourdough"));
     batch.tryAdd(new EventData("rye"));
     return producer.send(batch);
 }).subscribe(unused -> {
 },
     error -> System.err.println("Error occurred while sending batch:" + error),
     () -> System.out.println("Send complete."));

Exemple : Publier des événements à l’aide d’une taille limitée EventDataBatch

Dans l’exemple de code ci-dessous, tous les lots sont créés avec une taille maximale de 256 octets à l’aide setMaximumSizeInBytes(int maximumSizeInBytes) de est spécifié. Les événements à l’intérieur du lot sont automatiquement routés, car aucun ID de partition ou clé de partition n’est spécifié.

createBatch(CreateBatchOptions options) et send(EventDataBatch batch) sont des appels non bloquants. Après avoir configuré l’opération, sa représentation asynchrone est retournée. Le Mono doit être abonné, comme l’exemple ci-dessous, pour commencer à publier le lot d’événements.

Flux<EventData> telemetryEvents = Flux.just(firstEvent, secondEvent);

 // Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
 // In this case, all the batches created with these options are limited to 256 bytes.
 CreateBatchOptions options = new CreateBatchOptions().setMaximumSizeInBytes(256);
 AtomicReference<EventDataBatch> currentBatch = new AtomicReference<>(
     producer.createBatch(options).block());

 // The sample Flux contains two events, but it could be an infinite stream of telemetry events.
 Disposable publishingOperation = telemetryEvents.flatMap(event -> {
     EventDataBatch batch = currentBatch.get();

     if (batch.tryAdd(event)) {
         return Mono.empty();
     }

     // Send the current batch then create another size-limited EventDataBatch and try to fit the event into
     // this new batch.
     return producer.send(batch).then(
         producer.createBatch(options).map(newBatch -> {
             currentBatch.set(newBatch);

             // Add the event that did not fit in the previous batch.
             if (!newBatch.tryAdd(event)) {
                 return Mono.error(new IllegalArgumentException(
                     "Event was too large to fit in an empty batch. Max size: "
                         + newBatch.getMaxSizeInBytes()));
             }

             return Mono.empty();
         }));
 }).subscribe(unused -> {
 }, error -> {
     System.out.println("Error occurred publishing events: " + error);
 }, () -> {
     System.out.println("Completed publishing operation.");
 });

Résumé de la méthode

Modificateur et type Méthode et description
void close()

Supprime .EventHubProducerAsyncClient

Mono<EventDataBatch> createBatch()

Crée un qui peut s’adapter EventDataBatch à autant d’événements que le transport le permet.

Mono<EventDataBatch> createBatch(CreateBatchOptions options)

Crée un EventDataBatch configuré avec les options spécifiées.

String getEventHubName()

Obtient le nom d’Event Hub avec lequel ce client interagit.

Mono<EventHubProperties> getEventHubProperties()

Récupère des informations sur un hub d’événements, notamment le nombre de partitions présentes et leurs identificateurs.

String getFullyQualifiedNamespace()

Obtient l’espace de noms Event Hubs complet auquel la connexion est associée.

String getIdentifier()

Obtient l’identificateur du client.

Flux<String> getPartitionIds()

Récupère les identificateurs des partitions d’un Event Hub.

Mono<PartitionProperties> getPartitionProperties(String partitionId)

Récupère des informations sur une partition spécifique pour un Event Hub, y compris des éléments qui décrivent les événements disponibles dans le flux d’événements de partition.

Mono<Void> send(EventDataBatch batch)

Envoie le lot au hub d’événements associé.

Mono<Void> send(Iterable<EventData> events)

Envoie un ensemble d’événements au hub d’événements associé à l’aide d’une approche par lots.

Mono<Void> send(Iterable<EventData> events, SendOptions options)

Envoie un ensemble d’événements au hub d’événements associé à l’aide d’une approche par lots.

Méthodes héritées de java.lang.Object

Détails de la méthode

close

public void close()

Supprime .EventHubProducerAsyncClient Si le client disposait d’une connexion dédiée, la connexion sous-jacente est également fermée.

createBatch

public Mono createBatch()

Crée un qui peut s’adapter EventDataBatch à autant d’événements que le transport le permet.

Returns:

Nouveau EventDataBatch qui peut s’adapter à autant d’événements que le transport le permet.

createBatch

public Mono createBatch(CreateBatchOptions options)

Crée un EventDataBatch configuré avec les options spécifiées.

Parameters:

options - Ensemble d’options utilisées pour configurer .EventDataBatch

Returns:

Nouveau EventDataBatch qui peut s’adapter à autant d’événements que le transport le permet.

getEventHubName

public String getEventHubName()

Obtient le nom d’Event Hub avec lequel ce client interagit.

Returns:

Nom event Hub avec lequel ce client interagit.

getEventHubProperties

public Mono getEventHubProperties()

Récupère des informations sur un hub d’événements, notamment le nombre de partitions présentes et leurs identificateurs.

Returns:

Ensemble d’informations pour le hub d’événements auquel ce client est associé.

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

Obtient l’espace de noms Event Hubs complet auquel la connexion est associée. Cela est probablement similaire à {yournamespace}.servicebus.windows.net.

Returns:

Espace de noms Event Hubs complet auquel la connexion est associée.

getIdentifier

public String getIdentifier()

Obtient l’identificateur du client.

Returns:

Chaîne d’identificateur unique pour le client actuel.

getPartitionIds

public Flux getPartitionIds()

Récupère les identificateurs des partitions d’un Event Hub.

Returns:

Flux d’identificateurs pour les partitions d’un Event Hub.

getPartitionProperties

public Mono getPartitionProperties(String partitionId)

Récupère des informations sur une partition spécifique pour un Event Hub, y compris des éléments qui décrivent les événements disponibles dans le flux d’événements de partition.

Parameters:

partitionId - Identificateur unique d’une partition associée à Event Hub.

Returns:

Ensemble d’informations pour la partition demandée sous le hub d’événements à laquelle ce client est associé.

send

public Mono send(EventDataBatch batch)

Envoie le lot au hub d’événements associé.

Parameters:

batch - Lot à envoyer au service.

Returns:

Mono qui se termine lorsque le lot est envoyé au service.

send

public Mono send(Iterable events)

Envoie un ensemble d’événements au hub d’événements associé à l’aide d’une approche par lots. Si la taille des événements dépasse la taille maximale d’un lot unique, une exception est déclenchée et l’envoi échoue. Par défaut, la taille du message correspond à la quantité maximale autorisée sur le lien.

List<EventData> events = Arrays.asList(new EventData("maple"), new EventData("aspen"),
     new EventData("oak"));

 producer.send(events)
     .subscribe(unused -> {
     },
         error -> System.err.println("Error occurred while sending events:" + error),
         () -> System.out.println("Send complete."));

Pour plus d’informations sur la taille maximale d’événement autorisée, consultez Azure Event Hubs quotas et limites.

Parameters:

events - Événements à envoyer au service.

Returns:

Mono qui se termine lorsque tous les événements sont envoyés au service.

send

public Mono send(Iterable events, SendOptions options)

Envoie un ensemble d’événements au hub d’événements associé à l’aide d’une approche par lots. Si la taille des événements dépasse la taille maximale d’un lot unique, une exception est déclenchée et l’envoi échoue. Par défaut, la taille du message correspond à la quantité maximale autorisée sur le lien.

List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
     new EventData("New York"));

 SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
 producer.send(events, sendOptions)
     .subscribe(unused -> {
     },
         error -> System.err.println("Error occurred while sending events:" + error),
         () -> System.out.println("Send complete."));

Pour plus d’informations sur la taille maximale d’événement autorisée, consultez Azure Event Hubs quotas et limites.

Parameters:

events - Événements à envoyer au service.
options - Ensemble d’options à prendre en compte lors de l’envoi de ce lot.

Returns:

Mono qui se termine lorsque tous les événements sont envoyés au service.

S’applique à