Распространенные операции с клиентской библиотекой Центров событий

Завершено

В этом уроке содержатся примеры распространенных операций, которые можно выполнять с клиентской библиотекой Центров событий (Azure.Messaging.EventHubs) для взаимодействия с Центрами событий.

Проверка центров событий

Многие операции Центров событий выполняются в пределах определенной секции. Так как Центры событий принадлежат секциям, их имена назначаются во время создания. Чтобы понять, какие секции доступны, необходимо запросить центры событий с помощью одного из клиентов Центров событий. В примерах показано EventHubProducerClient, но суть и форма одинаковы для всех клиентов.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

Публикация событий в Центрах событий

Чтобы опубликовать события, необходимо создать EventHubProducerClient. Производители публикуют события в пакетах и могут запрашивать определенную секцию или разрешать службе Центров событий решать, какие события секции следует публиковать. Мы советуем использовать автоматическую маршрутизацию, если у публикации событий должен быть высокий уровень доступности или если данные событий должны распределяться равномерно между секциями. В нашем примере используется автоматическая маршрутизация.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

Чтение событий из Центров событий

Чтобы считывать события из Центров событий, необходимо создать EventHubConsumerClient для определенной группы потребителей. При создании Центров событий она предоставляет группу потребителей по умолчанию, которую можно использовать для начала изучения центров событий. В нашем примере мы сосредоточимся на чтении всех событий, опубликованных в Центрах событий с помощью итератора.

Примечание.

Важно отметить, что этот подход к потреблению предназначен для улучшения работы с клиентской библиотекой Центров событий и создания прототипов. Рекомендуется не использовать его в рабочих сценариях. Для использования в рабочей среде рекомендуем выбрать клиент обработчика событий, так как он обеспечивает более надежную и производительную работу.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Чтение событий из раздела Центров событий

Чтобы прочитать из определенной секции, потребитель должен указать, где в потоке событий начать получение событий. В нашем примере мы сосредоточимся на чтении всех опубликованных событий для первой секции Центров событий.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Обработка событий с помощью клиента обработчика событий

Для большинства рабочих сценариев рекомендуется использовать EventProcessorClient для чтения и обработки событий. EventProcessorClient Так как у него есть зависимость от служба хранилища Azure БОЛЬШИХ двоичных объектов для сохранения его состояния, необходимо указать BlobContainerClient обработчик, который был настроен для учетной записи хранения и контейнера, который должен использоваться.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}