EventHubProducerAsyncClient Classe
- java.
lang. Object - com.
azure. messaging. eventhubs. EventHubProducerAsyncClient
- com.
Implémente
public class EventHubProducerAsyncClient
implements Closeable
Producteur asynchrone chargé de EventData transmettre à un Event Hub spécifique, regroupé en lots. Selon le spécifié lors de la CreateBatchOptions création d’un EventDataBatch, les événements peuvent être automatiquement routés vers une partition disponible ou spécifiques à une partition. Informations supplémentaires et recommandations spécifiques pour les stratégies à utiliser lors de la publication d’événements se trouve dans : Distribuer des événements sur des partitions
Il est recommandé d’autoriser le routage automatique des partitions dans les cas suivants :
- L’envoi d’événements doit être hautement disponible.
- Les données d’événement doivent être réparties uniformément entre toutes les partitions disponibles.
Si aucun ID de partition n’est spécifié, les règles suivantes sont utilisées pour en sélectionner automatiquement un :
- Répartissez les événements de manière égale entre toutes les partitions disponibles à l’aide d’une approche de tourniquet (round robin).
- Si une partition devient indisponible, le service Event Hubs la détecte automatiquement et transfère le message vers une autre partition disponible.
Les exemples présentés dans ce document utilisent un objet d’informations d’identification nommé DefaultAzureCredential pour l’authentification, ce qui est approprié pour la plupart des scénarios, y compris les environnements de développement et de production locaux. En outre, nous vous recommandons d’utiliser l’identité managée pour l’authentification dans les environnements de production. Vous trouverez plus d’informations sur les différentes méthodes d’authentification et leurs types d’informations d’identification correspondants dans la documentation Azure Identity .
Exemple : Construire un EventHubProducerAsyncClient
L’exemple de code suivant illustre la création du client EventHubProducerAsyncClientasynchrone . fullyQualifiedNamespace
est le nom d’hôte de l’espace de noms Event Hubs. Il est répertorié sous le volet « Essentials » après avoir accédé à l’espace de noms Event Hubs via le portail Azure.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerAsyncClient producer = new EventHubClientBuilder()
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.buildAsyncProducerClient();
Exemple : Créer un producteur et publier des événements sur n’importe quelle partition
L’exemple de code suivant montre comment envoyer un ensemble d’événements à Event Hub. Les événements sont distribués via le routage automatique, car aucune option n’a été définie lors de la création du EventDataBatch via createBatch(). L’utilisation EventDataBatch est recommandée, car plusieurs événements peuvent être envoyés sur la connexion sous-jacente avec un seul message.
createBatch() et send(EventDataBatch batch) sont des appels non bloquants. Après avoir configuré l’opération, sa représentation asynchrone est retournée. Le Mono
doit être abonné, comme l’exemple ci-dessous, pour commencer à publier le lot d’événements.
// Creating a batch without options set, will allow for automatic routing of events to any partition.
producer.createBatch().flatMap(batch -> {
batch.tryAdd(new EventData("test-event-1"));
batch.tryAdd(new EventData("test-event-2"));
return producer.send(batch);
}).subscribe(unused -> {
},
error -> System.err.println("Error occurred while sending batch:" + error),
() -> System.out.println("Send complete."));
Exemple : Publier des événements sur la partition « 1 »
L’exemple de code suivant montre comment envoyer un ensemble d’événements à Event Hub vers la partition « 1 ». EventDataBatch via createBatch(CreateBatchOptions options). Les identificateurs de partition peuvent être obtenus à l’aide de getPartitionIds(). L’utilisation EventDataBatch est recommandée, car plusieurs événements peuvent être envoyés sur la connexion sous-jacente avec un seul message.
createBatch(CreateBatchOptions options) et send(EventDataBatch batch) sont des appels non bloquants. Après avoir configuré l’opération, sa représentation asynchrone est retournée. Le Mono
doit être abonné, comme l’exemple ci-dessous, pour commencer à publier le lot d’événements.
CreateBatchOptions options = new CreateBatchOptions().setPartitionId("1");
producer.createBatch(options).flatMap(batch -> {
batch.tryAdd(new EventData("test-event-1"));
batch.tryAdd(new EventData("test-event-2"));
return producer.send(batch);
}).subscribe(unused -> {
},
error -> System.err.println("Error occurred while sending batch to partition 1:" + error),
() -> System.out.println("Send to partition 1 complete."));
Exemple : Publier des événements sur la même partition, regroupés à l’aide de la clé de partition
Dans l’exemple de code ci-dessous, tous les événements avec la même clé de partition, « bread » sont envoyés à la même partition. Quand setPartitionId(String partitionId) est spécifié, il indique au service Event Hubs que ces événements appartiennent au même groupe et doivent appartenir à la même partition. Utile dans le cas où les développeurs veulent que les événements se terminent dans la même partition, mais ne se soucient pas de la partition dans laquelle ils se retrouvent.
createBatch(CreateBatchOptions options) et send(EventDataBatch batch) sont des appels non bloquants. Après avoir configuré l’opération, sa représentation asynchrone est retournée. Le Mono
doit être abonné, comme l’exemple ci-dessous, pour commencer à publier le lot d’événements.
CreateBatchOptions options = new CreateBatchOptions().setPartitionKey("bread");
producer.createBatch(options).flatMap(batch -> {
batch.tryAdd(new EventData("sourdough"));
batch.tryAdd(new EventData("rye"));
return producer.send(batch);
}).subscribe(unused -> {
},
error -> System.err.println("Error occurred while sending batch:" + error),
() -> System.out.println("Send complete."));
Exemple : Publier des événements à l’aide d’une taille limitée EventDataBatch
Dans l’exemple de code ci-dessous, tous les lots sont créés avec une taille maximale de 256 octets à l’aide setMaximumSizeInBytes(int maximumSizeInBytes) de est spécifié. Les événements à l’intérieur du lot sont automatiquement routés, car aucun ID de partition ou clé de partition n’est spécifié.
createBatch(CreateBatchOptions options) et send(EventDataBatch batch) sont des appels non bloquants. Après avoir configuré l’opération, sa représentation asynchrone est retournée. Le Mono
doit être abonné, comme l’exemple ci-dessous, pour commencer à publier le lot d’événements.
Flux<EventData> telemetryEvents = Flux.just(firstEvent, secondEvent);
// Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
// In this case, all the batches created with these options are limited to 256 bytes.
CreateBatchOptions options = new CreateBatchOptions().setMaximumSizeInBytes(256);
AtomicReference<EventDataBatch> currentBatch = new AtomicReference<>(
producer.createBatch(options).block());
// The sample Flux contains two events, but it could be an infinite stream of telemetry events.
Disposable publishingOperation = telemetryEvents.flatMap(event -> {
EventDataBatch batch = currentBatch.get();
if (batch.tryAdd(event)) {
return Mono.empty();
}
// Send the current batch then create another size-limited EventDataBatch and try to fit the event into
// this new batch.
return producer.send(batch).then(
producer.createBatch(options).map(newBatch -> {
currentBatch.set(newBatch);
// Add the event that did not fit in the previous batch.
if (!newBatch.tryAdd(event)) {
return Mono.error(new IllegalArgumentException(
"Event was too large to fit in an empty batch. Max size: "
+ newBatch.getMaxSizeInBytes()));
}
return Mono.empty();
}));
}).subscribe(unused -> {
}, error -> {
System.out.println("Error occurred publishing events: " + error);
}, () -> {
System.out.println("Completed publishing operation.");
});
Résumé de la méthode
Modificateur et type | Méthode et description |
---|---|
void |
close()
Supprime .EventHubProducerAsyncClient |
Mono<Event |
createBatch()
Crée un qui peut s’adapter EventDataBatch à autant d’événements que le transport le permet. |
Mono<Event |
createBatch(CreateBatchOptions options)
Crée un EventDataBatch configuré avec les options spécifiées. |
String |
getEventHubName()
Obtient le nom d’Event Hub avec lequel ce client interagit. |
Mono<Event |
getEventHubProperties()
Récupère des informations sur un hub d’événements, notamment le nombre de partitions présentes et leurs identificateurs. |
String |
getFullyQualifiedNamespace()
Obtient l’espace de noms Event Hubs complet auquel la connexion est associée. |
String |
getIdentifier()
Obtient l’identificateur du client. |
Flux<String> |
getPartitionIds()
Récupère les identificateurs des partitions d’un Event Hub. |
Mono<Partition |
getPartitionProperties(String partitionId)
Récupère des informations sur une partition spécifique pour un Event Hub, y compris des éléments qui décrivent les événements disponibles dans le flux d’événements de partition. |
Mono<Void> |
send(EventDataBatch batch)
Envoie le lot au hub d’événements associé. |
Mono<Void> |
send(Iterable<EventData> events)
Envoie un ensemble d’événements au hub d’événements associé à l’aide d’une approche par lots. |
Mono<Void> |
send(Iterable<EventData> events, SendOptions options)
Envoie un ensemble d’événements au hub d’événements associé à l’aide d’une approche par lots. |
Méthodes héritées de java.lang.Object
Détails de la méthode
close
public void close()
Supprime .EventHubProducerAsyncClient Si le client disposait d’une connexion dédiée, la connexion sous-jacente est également fermée.
createBatch
public Mono
Crée un qui peut s’adapter EventDataBatch à autant d’événements que le transport le permet.
Returns:
createBatch
public Mono
Crée un EventDataBatch configuré avec les options spécifiées.
Parameters:
Returns:
getEventHubName
public String getEventHubName()
Obtient le nom d’Event Hub avec lequel ce client interagit.
Returns:
getEventHubProperties
public Mono
Récupère des informations sur un hub d’événements, notamment le nombre de partitions présentes et leurs identificateurs.
Returns:
getFullyQualifiedNamespace
public String getFullyQualifiedNamespace()
Obtient l’espace de noms Event Hubs complet auquel la connexion est associée. Cela est probablement similaire à {yournamespace}.servicebus.windows.net
.
Returns:
getIdentifier
public String getIdentifier()
Obtient l’identificateur du client.
Returns:
getPartitionIds
public Flux
Récupère les identificateurs des partitions d’un Event Hub.
Returns:
getPartitionProperties
public Mono
Récupère des informations sur une partition spécifique pour un Event Hub, y compris des éléments qui décrivent les événements disponibles dans le flux d’événements de partition.
Parameters:
Returns:
send
public Mono
Envoie le lot au hub d’événements associé.
Parameters:
Returns:
send
public Mono
Envoie un ensemble d’événements au hub d’événements associé à l’aide d’une approche par lots. Si la taille des événements dépasse la taille maximale d’un lot unique, une exception est déclenchée et l’envoi échoue. Par défaut, la taille du message correspond à la quantité maximale autorisée sur le lien.
List<EventData> events = Arrays.asList(new EventData("maple"), new EventData("aspen"),
new EventData("oak"));
producer.send(events)
.subscribe(unused -> {
},
error -> System.err.println("Error occurred while sending events:" + error),
() -> System.out.println("Send complete."));
Pour plus d’informations sur la taille maximale d’événement autorisée, consultez Azure Event Hubs quotas et limites.
Parameters:
Returns:
send
public Mono
Envoie un ensemble d’événements au hub d’événements associé à l’aide d’une approche par lots. Si la taille des événements dépasse la taille maximale d’un lot unique, une exception est déclenchée et l’envoi échoue. Par défaut, la taille du message correspond à la quantité maximale autorisée sur le lien.
List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
new EventData("New York"));
SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
producer.send(events, sendOptions)
.subscribe(unused -> {
},
error -> System.err.println("Error occurred while sending events:" + error),
() -> System.out.println("Send complete."));
Pour plus d’informations sur la taille maximale d’événement autorisée, consultez Azure Event Hubs quotas et limites.
Parameters:
Returns:
S’applique à
Azure SDK for Java