Opérations courantes avec la bibliothèque cliente Event Hubs

Effectué

Cette unité contient des exemples d’opérations courantes que vous pouvez effectuer avec la bibliothèque cliente Event Hubs (Azure.Messaging.EventHubs) pour interagir avec un hub d’événements.

Inspecter Event Hubs

De nombreuses opérations Event Hubs ont lieu dans l’étendue d’une partition spécifique. Event Hubs étant propriétaire des partitions, leurs noms sont affectés au moment de la création. Pour connaître les partitions disponibles, vous devez interroger les hubs d’événements avec un des clients Event Hubs. EventHubProducerClient est illustré dans ces exemples, mais le concept et la forme sont communs à tous les clients.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

Publier des événements sur Event Hubs

Pour pouvoir publier des événements, vous devez créer un EventHubProducerClient. Les producteurs publient des événements par lots et peuvent demander une partition spécifique ou autoriser le service Event Hubs à décider sur quelle partition les événements doivent être publiés. Nous vous recommandons d’utiliser le routage automatique lorsque la publication d’événements doit être hautement disponible ou que les données d’événement doivent être réparties uniformément entre les partitions. Notre exemple tire parti du routage automatique.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

Lire des événements dans un hub d’événements

Pour pouvoir lire des événements dans un hub d’événements, vous devez créer un EventHubConsumerClient pour un groupe de consommateurs donné. Quand un hub d’événements est créé, il fournit un groupe de consommateurs par défaut qui peut être utilisé pour commencer à explorer Event Hubs. Dans notre exemple, nous nous attachons à lire tous les événements publiés sur Event Hubs à l’aide d’un itérateur.

Remarque

Il est important de noter que cette approche de la consommation est destinée à améliorer l’expérience d’exploration et de prototypage de la bibliothèque cliente d’Event Hubs. Il est recommandé de ne pas l’utiliser dans les scénarios de production. Pour une utilisation en production, nous vous recommandons d’utiliser le client de processeur d’événements, car il offre une expérience plus robuste et plus performante.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Lire des événements dans une partition de hub d’événements

Pour lire à partir d’une partition spécifique, le consommateur doit spécifier à quel endroit du flux d’événements il souhaite commencer à recevoir des événements. Dans notre exemple, nous nous attachons à lire tous les événements publiés pour la première partition d’Event Hubs.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Traitement des événements avec un client de processeur d’événements

Dans la plupart des scénarios de production, il est recommandé d’utiliser EventProcessorClient pour lire et traiter les événements. Comme EventProcessorClient a une dépendance vis-à-vis de blobs de Stockage Azure pour la persistance de son état, vous devez fournir un BlobContainerClient pour le processeur, configuré pour le compte de stockage et le conteneur à utiliser.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}