使用事件中心客户端库执行常见操作

已完成

本单元包含为与事件中心交互,可使用事件中心客户端库 (Azure.Messaging.EventHubs) 执行的常见操作的示例。

检查事件中心

许多事件中心操作都在特定分区范围内进行。 由于分区由事件中心拥有,因此将在创建时分配名称。 要了解哪些分区可用,可以使用一个事件中心客户端查询事件中心。 为了举例说明,在这些示例中演示了 EventHubProducerClient,但概念和形式在客户端之间是通用的。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

将事件发布到事件中心

若要发布事件,需要创建 EventHubProducerClient。 生成者分批发布事件,并可能请求特定分区,或允许事件中心服务决定应将事件发布到哪些分区。 当事件的发布需要高度可用或事件数据应该均匀分布在分区之间时,建议使用自动路由。 我们的示例将利用自动路由。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

从事件中心读取事件

若要从事件中心读取事件,将需要为给定的使用者组创建 EventHubConsumerClient。 创建事件中心时,它会提供一个默认的使用者组,可用来开始探索事件中心。 在我们的示例中,我们将着重介绍如何使用枚举器读取已发布到事件中心的所有事件。

注意

需要注意的是,这种使用方法旨在改善探索事件中心客户端库和原型制作的体验。 建议不要用于生产情形。 对于生产用途,建议使用事件处理器客户端,因为它可以提供更强大和更高效的体验。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

从事件中心分区读取事件

若要从特定分区读取,使用者需要指定要从事件流的什么位置开始接收事件。 在本示例中,我们将着重介绍如何读取事件中心第一个分区的所有已发布事件。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

使用事件处理器客户端处理事件

对于大多数生产场景,建议使用 EventProcessorClient 来读取和处理事件。 由于 EventProcessorClient 依赖于 Azure 存储 blob 以保持其状态,因此需要为处理器提供 BlobContainerClient,并且该客户端已针对应使用的存储帐户和容器进行了配置。

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}