Event Hubs 클라이언트 라이브러리를 사용하여 일반 작업 수행

완료됨

이 단원에는 이벤트 허브와 상호 작용하기 위해 Event Hubs 클라이언트 라이브러리(Azure.Messaging.EventHubs)로 수행할 수 있는 일반적인 작업의 예가 포함되어 있습니다.

Event Hubs 검사

많은 Event Hubs 작업은 특정 파티션 범위 내에서 수행됩니다. Event Hubs는 파티션을 소유하기 때문에 만들 때 해당 이름이 할당됩니다. 사용할 수 있는 파티션을 이해하려면 Event Hubs 클라이언트 중 하나를 사용하여 이벤트 허브를 쿼리합니다. 예를 들어 다음 예제에서는 EventHubProducerClient를 보여 주지만 개념 및 형식은 클라이언트 간에 공통적입니다.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

Event Hubs에 이벤트 게시

이벤트를 게시하려면 EventHubProducerClient를 만들어야 합니다. 생산자는 이벤트를 일괄 처리로 게시하고 특정 파티션을 요청하거나 Event Hubs 서비스가 게시할 파티션 이벤트를 결정하도록 허용할 수 있습니다. 이벤트 게시가 고가용성이어야 하거나 이벤트 데이터를 파티션 간에 균등하게 분산해야 하는 경우 자동 라우팅을 사용하는 것이 좋습니다. 예제에서는 자동 라우팅을 활용합니다.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

Event Hubs에서 이벤트 읽기

Event Hubs에서 이벤트를 읽으려면 특정 소비자 그룹에 대한 EventHubConsumerClient를 만들어야 합니다. Event Hubs를 만들면 이벤트 허브 탐색을 시작하는 데 사용할 수 있는 기본 소비자 그룹을 제공합니다. 예제에서는 반복기를 사용하여 Event Hubs에 게시된 모든 이벤트를 읽는 데 초점을 맞춥니다.

참고 항목

이 접근 방법은 Event Hubs 클라이언트 라이브러리 및 프로토타이핑을 탐색하는 환경을 개선하기 위한 것입니다. 프로덕션 시나리오에서는 사용하지 않는 것이 좋습니다. 프로덕션에서는 보다 강력하고 성능의 환경을 제공하는 이벤트 프로세서 클라이언트를 사용하는 것이 좋습니다.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Event Hubs 파티션에서 이벤트 읽기

특정 파티션에서 읽으려면 소비자는 이벤트 스트림에서 이벤트 수신을 시작할 위치를 지정해야 합니다. 이 예제에서는 Event Hubs의 첫 번째 파티션에 대해 게시된 모든 이벤트를 읽는 데 중점을 둡니다.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

이벤트 프로세서 클라이언트를 사용하여 이벤트 처리

대부분의 프로덕션 시나리오에서는 이벤트를 읽고 처리하는 데 EventProcessorClient를 사용하는 것이 좋습니다. EventProcessorClient는 상태의 지속성을 위해 Azure Storage Blob에 의존하기 때문에 사용해야 하는 스토리지 계정 및 컨테이너에 대해 구성된 프로세서에 대한 BlobContainerClient를 제공해야 합니다.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}