Ausführen allgemeiner Vorgänge mit der Event Hubs-Clientbibliothek

Abgeschlossen

Diese Lerneinheit enthält Beispiele für allgemeine Vorgänge, die Sie mit der Event Hubs-Clientbibliothek (Azure.Messaging.EventHubs) ausführen können, um mit einem Event Hub zu interagieren.

Untersuchen von Event Hubs

Viele Event Hub-Vorgänge finden im Bereichs einer bestimmten Partition statt. Da sich Partitionen im Besitz des Event Hubs befinden, werden ihre Namen zum Zeitpunkt der Erstellung zugewiesen. Um zu verstehen, welche Partitionen verfügbar sind, fragen Sie den Event Hub mit einem der Event Hub-Clients ab. Zur Veranschaulichung wird in diesen Beispielen EventHubProducerClient verwendet, das Konzept und die Form sind aber clientübergreifend gleich.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

Veröffentlichen von Ereignissen für Event Hubs

Um Ereignisse zu veröffentlichen, müssen Sie einen EventHubProducerClient erstellen. Producer veröffentlichen Ereignisse in Batches und können eine bestimmte Partition anfordern oder dem Event Hubs-Dienst die Entscheidung erlauben, auf welcher Partition Ereignisse veröffentlicht werden sollen. Es wird empfohlen, automatisches Routing zu verwenden, wenn die Veröffentlichung von Ereignissen hochverfügbar sein muss oder wenn Ereignisdaten gleichmäßig auf die Partitionen verteilt werden sollen. In unserem Beispiel wird automatisches Routing genutzt.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

Lesen von Ereignissen aus einem Event Hub

Um Ereignisse aus einem Event Hub zu lesen, müssen Sie einen EventHubConsumerClient für die jeweilige Consumergruppe erstellen. Wenn ein Event Hub erstellt wird, stellt er eine Standardconsumergruppe bereit, die verwendet werden kann, um mit dem Erkunden von Event Hubs zu beginnen. In unserem Beispiel konzentrieren wir uns auf das Lesen aller Ereignisse, die mithilfe eines Iterators im Event Hub veröffentlicht wurden.

Hinweis

Es ist wichtig zu beachten, dass dieser Nutzungsansatz dazu dient, die Untersuchung der Event Hubs-Clientbibliothek und die Prototyperstellung zu verbessern. Es wird empfohlen, diesen Ansatz nicht in Produktionsszenarien zu verwenden. Für die Verwendung in der Produktion empfehlen wir den Ereignisprozessorclient, da er eine stabilere und leistungsfähigere Erfahrung bietet.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Lesen von Ereignissen aus einer Event Hubs-Partition

Um aus einer bestimmten Partition zu lesen, muss der Verbraucher angeben, wo im Ereignisdatenstrom mit dem Empfang von Ereignissen begonnen werden soll. In unserem Beispiel konzentrieren wir uns auf das Lesen aller veröffentlichten Ereignisse für die erste Partition der Event Hubs.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Verarbeiten von Ereignissen mithilfe eines Ereignisprozessorclients

Für die meisten Produktionsszenarien empfiehlt es sich, EventProcessorClient zum Lesen und Verarbeiten von Ereignissen zu verwenden. Da EventProcessorClient eine Abhängigkeit von Azure Storage-Blobs für die Persistenz des Zustands aufweist, müssen Sie einen BlobContainerClient für den Prozessor bereitstellen, der für das Speicherkonto und den Container konfiguriert wurde, die verwendet werden sollen.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}