Event Hubs 클라이언트 라이브러리를 사용하여 일반 작업 수행
이 단원에는 이벤트 허브와 상호 작용하기 위해 Event Hubs 클라이언트 라이브러리(Azure.Messaging.EventHubs
)로 수행할 수 있는 일반적인 작업의 예가 포함되어 있습니다.
Event Hubs 검사
많은 Event Hubs 작업은 특정 파티션 범위 내에서 수행됩니다. Event Hubs는 파티션을 소유하기 때문에 만들 때 해당 이름이 할당됩니다. 사용할 수 있는 파티션을 이해하려면 Event Hubs 클라이언트 중 하나를 사용하여 이벤트 허브를 쿼리합니다. 예를 들어 다음 예제에서는 EventHubProducerClient
를 보여 주지만 개념 및 형식은 클라이언트 간에 공통적입니다.
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
string[] partitionIds = await producer.GetPartitionIdsAsync();
}
Event Hubs에 이벤트 게시
이벤트를 게시하려면 EventHubProducerClient
를 만들어야 합니다. 생산자는 이벤트를 일괄 처리로 게시하고 특정 파티션을 요청하거나 Event Hubs 서비스가 게시할 파티션 이벤트를 결정하도록 허용할 수 있습니다. 이벤트 게시가 고가용성이어야 하거나 이벤트 데이터를 파티션 간에 균등하게 분산해야 하는 경우 자동 라우팅을 사용하는 것이 좋습니다. 예제에서는 자동 라우팅을 활용합니다.
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
using EventDataBatch eventBatch = await producer.CreateBatchAsync();
eventBatch.TryAdd(new EventData(new BinaryData("First")));
eventBatch.TryAdd(new EventData(new BinaryData("Second")));
await producer.SendAsync(eventBatch);
}
Event Hubs에서 이벤트 읽기
Event Hubs에서 이벤트를 읽으려면 특정 소비자 그룹에 대한 EventHubConsumerClient
를 만들어야 합니다. Event Hubs를 만들면 이벤트 허브 탐색을 시작하는 데 사용할 수 있는 기본 소비자 그룹을 제공합니다. 예제에서는 반복기를 사용하여 Event Hubs에 게시된 모든 이벤트를 읽는 데 초점을 맞춥니다.
참고 항목
이 접근 방법은 Event Hubs 클라이언트 라이브러리 및 프로토타이핑을 탐색하는 환경을 개선하기 위한 것입니다. 프로덕션 시나리오에서는 사용하지 않는 것이 좋습니다. 프로덕션에서는 보다 강력하고 성능의 환경을 제공하는 이벤트 프로세서 클라이언트를 사용하는 것이 좋습니다.
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
{
// At this point, the loop will wait for events to be available in the Event Hub. When an event
// is available, the loop will iterate with the event that was received. Because we did not
// specify a maximum wait time, the loop will wait forever unless cancellation is requested using
// the cancellation token.
}
}
Event Hubs 파티션에서 이벤트 읽기
특정 파티션에서 읽으려면 소비자는 이벤트 스트림에서 이벤트 수신을 시작할 위치를 지정해야 합니다. 이 예제에서는 Event Hubs의 첫 번째 파티션에 대해 게시된 모든 이벤트를 읽는 데 중점을 둡니다.
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
EventPosition startingPosition = EventPosition.Earliest;
string partitionId = (await consumer.GetPartitionIdsAsync()).First();
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
{
// At this point, the loop will wait for events to be available in the partition. When an event
// is available, the loop will iterate with the event that was received. Because we did not
// specify a maximum wait time, the loop will wait forever unless cancellation is requested using
// the cancellation token.
}
}
이벤트 프로세서 클라이언트를 사용하여 이벤트 처리
대부분의 프로덕션 시나리오에서는 이벤트를 읽고 처리하는 데 EventProcessorClient
를 사용하는 것이 좋습니다. EventProcessorClient
는 상태의 지속성을 위해 Azure Storage Blob에 의존하기 때문에 사용해야 하는 스토리지 계정 및 컨테이너에 대해 구성된 프로세서에 대한 BlobContainerClient
를 제공해야 합니다.
var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";
var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;
var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);
processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;
await processor.StartProcessingAsync();
try
{
// The processor performs its work in the background; block until cancellation
// to allow processing to take place.
await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
// This is expected when the delay is canceled.
}
try
{
await processor.StopProcessingAsync();
}
finally
{
// To prevent leaks, the handlers should be removed when processing is complete.
processor.ProcessEventAsync -= processEventHandler;
processor.ProcessErrorAsync -= processErrorHandler;
}