使用事件中心客户端库执行常见操作
本单元包含为与事件中心交互,可使用事件中心客户端库 (Azure.Messaging.EventHubs
) 执行的常见操作的示例。
检查事件中心
许多事件中心操作都在特定分区范围内进行。 由于分区由事件中心拥有,因此将在创建时分配名称。 要了解哪些分区可用,可以使用一个事件中心客户端查询事件中心。 为了举例说明,在这些示例中演示了 EventHubProducerClient
,但概念和形式在客户端之间是通用的。
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
string[] partitionIds = await producer.GetPartitionIdsAsync();
}
将事件发布到事件中心
若要发布事件,需要创建 EventHubProducerClient
。 生成者分批发布事件,并可能请求特定分区,或允许事件中心服务决定应将事件发布到哪些分区。 当事件的发布需要高度可用或事件数据应该均匀分布在分区之间时,建议使用自动路由。 我们的示例将利用自动路由。
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
using EventDataBatch eventBatch = await producer.CreateBatchAsync();
eventBatch.TryAdd(new EventData(new BinaryData("First")));
eventBatch.TryAdd(new EventData(new BinaryData("Second")));
await producer.SendAsync(eventBatch);
}
从事件中心读取事件
若要从事件中心读取事件,将需要为给定的使用者组创建 EventHubConsumerClient
。 创建事件中心时,它会提供一个默认的使用者组,可用来开始探索事件中心。 在我们的示例中,我们将着重介绍如何使用枚举器读取已发布到事件中心的所有事件。
注意
需要注意的是,这种使用方法旨在改善探索事件中心客户端库和原型制作的体验。 建议不要用于生产情形。 对于生产用途,建议使用事件处理器客户端,因为它可以提供更强大和更高效的体验。
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
{
// At this point, the loop will wait for events to be available in the Event Hub. When an event
// is available, the loop will iterate with the event that was received. Because we did not
// specify a maximum wait time, the loop will wait forever unless cancellation is requested using
// the cancellation token.
}
}
从事件中心分区读取事件
若要从特定分区读取,使用者需要指定要从事件流的什么位置开始接收事件。 在本示例中,我们将着重介绍如何读取事件中心第一个分区的所有已发布事件。
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
EventPosition startingPosition = EventPosition.Earliest;
string partitionId = (await consumer.GetPartitionIdsAsync()).First();
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
{
// At this point, the loop will wait for events to be available in the partition. When an event
// is available, the loop will iterate with the event that was received. Because we did not
// specify a maximum wait time, the loop will wait forever unless cancellation is requested using
// the cancellation token.
}
}
使用事件处理器客户端处理事件
对于大多数生产场景,建议使用 EventProcessorClient
来读取和处理事件。 由于 EventProcessorClient
依赖于 Azure 存储 blob 以保持其状态,因此需要为处理器提供 BlobContainerClient
,并且该客户端已针对应使用的存储帐户和容器进行了配置。
var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";
var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;
var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);
processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;
await processor.StartProcessingAsync();
try
{
// The processor performs its work in the background; block until cancellation
// to allow processing to take place.
await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
// This is expected when the delay is canceled.
}
try
{
await processor.StopProcessingAsync();
}
finally
{
// To prevent leaks, the handlers should be removed when processing is complete.
processor.ProcessEventAsync -= processEventHandler;
processor.ProcessErrorAsync -= processErrorHandler;
}