Wykonywanie typowych operacji za pomocą biblioteki klienta usługi Event Hubs

Ukończone

Ta lekcja zawiera przykłady typowych operacji, które można wykonać za pomocą biblioteki klienta usługi Event Hubs (Azure.Messaging.EventHubs) w celu interakcji z usługą Event Hubs.

Inspekcja usługi Event Hubs

Wiele operacji usługi Event Hubs odbywa się w zakresie określonej partycji. Ponieważ usługa Event Hubs jest właścicielem partycji, ich nazwy są przypisywane podczas tworzenia. Aby dowiedzieć się, jakie partycje są dostępne, należy wykonać zapytanie względem usługi Event Hubs przy użyciu jednego z klientów usługi Event Hubs. Na ilustracji przedstawiono to EventHubProducerClient w tych przykładach, ale koncepcja i forma są wspólne dla klientów.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

Publikowanie zdarzeń w usłudze Event Hubs

Aby publikować zdarzenia, należy utworzyć element EventHubProducerClient. Producenci publikują zdarzenia w partiach i mogą zażądać określonej partycji lub zezwolić usłudze Event Hubs na podjęcie decyzji o tym, które zdarzenia partycji mają zostać opublikowane. Zalecamy używanie routingu automatycznego, gdy publikowanie zdarzeń musi być wysoce dostępne lub gdy dane zdarzenia powinny być równomiernie dystrybuowane między partycjami. Nasz przykład korzysta z automatycznego routingu.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

Odczytywanie zdarzeń z usługi Event Hubs

Aby odczytywać zdarzenia z usługi Event Hubs, należy utworzyć dla EventHubConsumerClient danej grupy odbiorców. Po utworzeniu usługi Event Hubs udostępnia ona domyślną grupę odbiorców, która może służyć do rozpoczęcia eksplorowania usługi Event Hubs. W naszym przykładzie koncentrujemy się na odczytywaniu wszystkich zdarzeń publikowanych w usłudze Event Hubs przy użyciu iteratora.

Uwaga

Należy pamiętać, że takie podejście do używania ma na celu poprawę środowiska eksplorowania biblioteki klienta usługi Event Hubs i tworzenia prototypów. Zalecane jest, aby nie były używane w scenariuszach produkcyjnych. W przypadku użycia w środowisku produkcyjnym zalecamy użycie klienta procesora zdarzeń, ponieważ zapewnia bardziej niezawodne i wydajne środowisko.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Odczytywanie zdarzeń z partycji usługi Event Hubs

Aby odczytać z określonej partycji, użytkownik musi określić miejsce w strumieniu zdarzeń, aby rozpocząć odbieranie zdarzeń. W naszym przykładzie koncentrujemy się na odczytywaniu wszystkich opublikowanych zdarzeń dla pierwszej partycji usługi Event Hubs.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Przetwarzanie zdarzeń przy użyciu klienta procesora zdarzeń

W przypadku większości scenariuszy produkcyjnych zaleca się użycie EventProcessorClient polecenia do odczytywania i przetwarzania zdarzeń. EventProcessorClient Ponieważ obiekt ma zależność od obiektów blob usługi Azure Storage w celu utrzymania stanu, należy podać BlobContainerClient dla procesora, który został skonfigurowany dla konta magazynu i kontenera, który powinien być używany.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}