Utföra vanliga åtgärder med Event Hubs-klientbiblioteket

Slutförd

Den här lektionen innehåller exempel på vanliga åtgärder som du kan utföra med Event Hubs-klientbiblioteket (Azure.Messaging.EventHubs) för att interagera med en händelsehubb.

Inspektera Event Hubs

Många Event Hubs-åtgärder sker inom ramen för en specifik partition. Eftersom Event Hubs äger partitionerna tilldelas deras namn när de skapas. För att förstå vilka partitioner som är tillgängliga frågar du Event Hubs med hjälp av en av Event Hubs-klienterna. Som illustration visas i de här exemplen EventHubProducerClient , men konceptet och formuläret är vanliga mellan klienter.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

Publicera händelser till Event Hubs

För att kunna publicera händelser måste du skapa en EventHubProducerClient. Producenter publicerar händelser i batchar och kan begära en specifik partition, eller låta Event Hubs-tjänsten bestämma vilka partitionshändelser som ska publiceras till. Vi rekommenderar att du använder automatisk routning när publiceringen av händelser måste vara högtillgänglig eller när händelsedata ska fördelas jämnt mellan partitionerna. Vårt exempel drar nytta av automatisk routning.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

Läsa händelser från en händelsehubb

För att kunna läsa händelser från en händelsehubb måste du skapa en EventHubConsumerClient för en viss konsumentgrupp. När en händelsehubb skapas tillhandahåller den en standardkonsumentgrupp som kan användas för att komma igång med att utforska Event Hubs. I vårt exempel fokuserar vi på att läsa alla händelser som publicerats till Event Hubs med hjälp av en iterator.

Kommentar

Det är viktigt att observera att den här metoden för användning är avsedd att förbättra upplevelsen av att utforska Event Hubs-klientbiblioteket och prototyper. Vi rekommenderar att den inte används i produktionsscenarier. För produktionsanvändning rekommenderar vi att du använder eventprocessorklienten eftersom den ger en mer robust och högpresterande upplevelse.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Läsa händelser från en Event Hubs-partition

För att kunna läsa från en specifik partition måste konsumenten ange var i händelseströmmen som ska börja ta emot händelser. I vårt exempel fokuserar vi på att läsa alla publicerade händelser för den första partitionen av Event Hubs.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Bearbeta händelser med en händelseprocessorklient

I de flesta produktionsscenarier är rekommendationen att använda EventProcessorClient för att läsa och bearbeta händelser. EventProcessorClient Eftersom har ett beroende av Azure Storage-blobar för beständighet av dess tillstånd måste du ange en BlobContainerClient för processorn, som har konfigurerats för lagringskontot och containern som ska användas.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}