使用事件中樞用戶端程式庫執行一般作業
本單元包含您可以使用[事件中樞] 用戶端程式庫 (Azure.Messaging.EventHubs
) 執行常見作業以與 [事件中樞] 互動的範例。
檢查 [事件中樞]
許多 [事件中樞] 作業都會在特定分割區的範圍內進行。 由於事件中樞擁有分割區,因此會在建立時指派其名稱。 若要瞭解可用的分割區,您可以使用其中一個 [事件中樞] 用戶端來查詢 [事件中樞]。 這些範例中示範了 EventHubProducerClient
作為說明,但概念和形式可在用戶端之間通用。
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
string[] partitionIds = await producer.GetPartitionIdsAsync();
}
將事件發佈至 [事件中樞]
為了發佈事件,您必須建立 EventHubProducerClient
。 產生者會以批次方式發佈事件,且可能會要求特定分割區,或允許事件中樞服務決定應該發佈至哪個分割區事件。 當發佈事件需要高度可用,或事件資料應該平均分散至分割區時,建議使用自動路由。 我們的範例會利用自動路由。
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
using EventDataBatch eventBatch = await producer.CreateBatchAsync();
eventBatch.TryAdd(new EventData(new BinaryData("First")));
eventBatch.TryAdd(new EventData(new BinaryData("Second")));
await producer.SendAsync(eventBatch);
}
從 [事件中樞] 讀取事件
為了從 [事件中樞] 讀取事件,您必須為指定的取用者群組建立 EventHubConsumerClient
。 建立 [事件中樞] 時,其會提供可用來開始探索 [事件中樞] 的預設取用者群組。 在我們的範例中,我們將著重於使用迭代器讀取發佈至 [事件中樞] 的所有事件。
注意
請務必注意,這個取用方法旨在改善探索事件中樞用戶端程式庫和原型設計的體驗。 建議不要在生產案例中使用。 針對生產環境使用,我們建議使用事件處理器用戶端,因為其提供更可靠且高效能的體驗。
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
{
// At this point, the loop will wait for events to be available in the Event Hub. When an event
// is available, the loop will iterate with the event that was received. Because we did not
// specify a maximum wait time, the loop will wait forever unless cancellation is requested using
// the cancellation token.
}
}
從 [事件中樞] 分割區讀取事件
若要從特定分割區讀取,取用者必須指定事件資料流中的何處開始接收事件。 在我們的範例中,我們著重於讀取事件中樞第一個分割區的所有已發佈事件。
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
EventPosition startingPosition = EventPosition.Earliest;
string partitionId = (await consumer.GetPartitionIdsAsync()).First();
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
{
// At this point, the loop will wait for events to be available in the partition. When an event
// is available, the loop will iterate with the event that was received. Because we did not
// specify a maximum wait time, the loop will wait forever unless cancellation is requested using
// the cancellation token.
}
}
使用事件處理器用戶端處理事件
在大部分的生產案例中,建議將 EventProcessorClient
用於讀取和處理事件。 由於 EventProcessorClient
相依於 Azure 儲存體 Blob 以維持其狀態,因此您必須為處理器提供 BlobContainerClient
,而其已針對應使用的儲存體帳戶和容器進行設定。
var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";
var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;
var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);
processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;
await processor.StartProcessingAsync();
try
{
// The processor performs its work in the background; block until cancellation
// to allow processing to take place.
await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
// This is expected when the delay is canceled.
}
try
{
await processor.StopProcessingAsync();
}
finally
{
// To prevent leaks, the handlers should be removed when processing is complete.
processor.ProcessEventAsync -= processEventHandler;
processor.ProcessErrorAsync -= processErrorHandler;
}