Event Hubs クライアント ライブラリで一般的な操作を実行する
このユニットには、Event Hubs クライアントライブラリ (Azure.Messaging.EventHubs
) を使用してイベント ハブと対話するために実行できる一般的な操作の例が含まれています。
Event Hubs について調べる
Event Hubs の多くの操作は、特定のパーティションのスコープ内で実行されます。 Event Hubs がパーティションを所有しているため、その名前は作成時に割り当てられます。 使用できるパーティションを理解するには、Event Hubs クライアントのいずれかを使って、Event Hubs のクエリを実行します。 これらの例では、図示するために EventHubProducerClient
を例として示していますが、概念と形式はクライアント全体で共通です。
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
string[] partitionIds = await producer.GetPartitionIdsAsync();
}
イベントを Event Hubs に発行する
イベントを発行するには、EventHubProducerClient
を作成する必要があります。 プロデューサーは、イベントをバッチで発行し、特定のパーティションを要求できます。また、イベントの発行先のパーティションを Event Hubs サービスで決定することもできます。 イベントの発行で高可用性が必要な場合、またはイベント データをパーティション間で均等に配分する必要がある場合は、自動ルーティングを使用することをお勧めします。 この例では、自動ルーティングを利用します。
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
using EventDataBatch eventBatch = await producer.CreateBatchAsync();
eventBatch.TryAdd(new EventData(new BinaryData("First")));
eventBatch.TryAdd(new EventData(new BinaryData("Second")));
await producer.SendAsync(eventBatch);
}
イベント ハブからイベントを読み取る
イベント ハブからイベントを読み取るには、特定のコンシューマー グループ用の EventHubConsumerClient
を作成する必要があります。 イベント ハブを作成するときは、Event Hubs の探索を始めるために使用できる既定のコンシューマー グループが用意されています。 この例では、Event Hubs に発行されたすべてのイベントを、反復子を使って読み取ることに焦点を当てます。
Note
重要な注意点は、このアプローチが Event Hubs クライアント ライブラリの探索とプロトタイプ作成のエクスペリエンスを向上させることを目的としていることです。 運用環境のシナリオでは使用しないことをお勧めします。 運用環境で使用する場合は、堅牢で高パフォーマンスのエクスペリエンスを提供するため、イベント プロセッサ クライアントを使用することをお勧めします。
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
{
// At this point, the loop will wait for events to be available in the Event Hub. When an event
// is available, the loop will iterate with the event that was received. Because we did not
// specify a maximum wait time, the loop will wait forever unless cancellation is requested using
// the cancellation token.
}
}
Event Hubs のパーティションからイベントを読み取る
特定のパーティションから読み取るために、コンシューマーはイベント ストリーム内でイベントの受信を開始する場所を指定する必要があります。 この例では、Event Hubs の最初のパーティションについて、発行されたすべてのイベントを読み取ることに焦点を当てます。
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
EventPosition startingPosition = EventPosition.Earliest;
string partitionId = (await consumer.GetPartitionIdsAsync()).First();
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
{
// At this point, the loop will wait for events to be available in the partition. When an event
// is available, the loop will iterate with the event that was received. Because we did not
// specify a maximum wait time, the loop will wait forever unless cancellation is requested using
// the cancellation token.
}
}
イベント プロセッサ クライアントを使用してイベントを処理する
ほとんどの運用シナリオでは、イベントの読み取りと処理に EventProcessorClient
を使用することをお勧めします。 EventProcessorClient
は、Azure Storage BLOB を使って状態を永続化するため、プロセッサに BlobContainerClient
を指定する必要があります。これは、使用する必要があるストレージ アカウントとコンテナーに構成されています。
var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";
var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;
var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);
processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;
await processor.StartProcessingAsync();
try
{
// The processor performs its work in the background; block until cancellation
// to allow processing to take place.
await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
// This is expected when the delay is canceled.
}
try
{
await processor.StopProcessingAsync();
}
finally
{
// To prevent leaks, the handlers should be removed when processing is complete.
processor.ProcessEventAsync -= processEventHandler;
processor.ProcessErrorAsync -= processErrorHandler;
}