Delen via


Azure Event Hubs Event Processor-clientbibliotheek voor .NET - versie 5.8.1

Azure Event Hubs is een uiterst schaalbare service voor publiceren/abonneren waarmee miljoenen gebeurtenissen per seconde kunnen worden opgenomen en naar meerdere consumenten kunnen worden gestreamd. Hiermee kunt u de enorme hoeveelheden gegevens die door uw verbonden apparaten en toepassingen worden geproduceerd, verwerken en analyseren. Zodra Event Hubs de gegevens heeft verzameld, kunt u deze ophalen, transformeren en opslaan met behulp van een realtime analyseprovider of met batchverwerkings-/opslagadapters. Als u meer wilt weten over Azure Event Hubs, kunt u het volgende lezen: Wat is Event Hubs.

De gebeurtenisprocessorclientbibliotheek is een aanvulling op de Azure Event Hubs clientbibliotheek en biedt een zelfstandige client voor het gebruik van gebeurtenissen op een robuuste, duurzame en schaalbare manier die geschikt is voor de meeste productiescenario's. Een eigen implementatie die is gebouwd met behulp van Azure Storage-blobs. De gebeurtenisprocessor wordt aanbevolen voor:

  • Gebeurtenissen op alle partities van een Event Hub op schaal lezen en verwerken met tolerantie voor tijdelijke storingen en onregelmatige netwerkproblemen.

  • Gebeurtenissen gezamenlijk verwerken, waarbij meerdere processors dynamisch de verantwoordelijkheid distribueren en delen in de context van een consumentengroep, waarbij de belasting probleemloos wordt beheerd wanneer processors worden toegevoegd en verwijderd uit de groep.

  • Controlepunten en statussen voor verwerking op een duurzame manier beheren met Behulp van Azure Storage-blobs als het onderliggende gegevensarchief.

Broncode | Pakket (NuGet) | API-referentiedocumentatie | Productdocumentatie | Gids voor probleemoplossing

Aan de slag

Vereisten

  • Azure-abonnement: Als u Azure-services wilt gebruiken, waaronder Azure Event Hubs, hebt u een abonnement nodig. Als u geen bestaand Azure-account hebt, kunt u zich registreren voor een gratis proefversie of de voordelen van uw Visual Studio Subscription gebruiken wanneer u een account maakt.

  • Event Hubs-naamruimte met een Event Hub: Als u wilt communiceren met Azure Event Hubs, moet u ook een naamruimte en Event Hub beschikbaar hebben. Als u niet bekend bent met het maken van Azure-resources, kunt u de stapsgewijze handleiding voor het maken van een Event Hub volgen met behulp van de Azure Portal. Daar vindt u ook gedetailleerde instructies voor het gebruik van de Azure CLI-, Azure PowerShell- of Arm-sjablonen (Azure Resource Manager) om een Event Hub te maken.

  • Azure Storage-account met blobopslag: Als u controlepunten wilt behouden en het eigendom wilt beheren in Azure Storage, moet u een Azure Storage-account met beschikbare blobs hebben. Als u niet bekend bent met Azure Storage-accounts, kunt u de stapsgewijze handleiding voor het maken van een opslagaccount volgen met behulp van de Azure Portal. Daar vindt u ook gedetailleerde instructies voor het gebruik van de Azure CLI-, Azure PowerShell- of Arm-sjablonen (Azure Resource Manager) om opslagaccounts te maken.

  • Azure Storage-blobcontainer: Controlepunt- en eigendomsgegevens in Azure Storage worden geschreven naar blobs in een specifieke container. De EventProcessorClient vereist een bestaande container en maakt er niet impliciet een om te beschermen tegen onbedoelde onjuiste configuratie. Als u niet bekend bent met Azure Storage-containers, kunt u de documentatie over het beheren van containers raadplegen. Hier vindt u gedetailleerde instructies voor het gebruik van .NET, de Azure CLI of Azure PowerShell om een container te maken.

  • C# 8.0: De Azure Event Hubs-clientbibliotheek maakt gebruik van nieuwe functies die zijn geïntroduceerd in C# 8.0. Als u wilt profiteren van de C# 8.0-syntaxis, wordt u aangeraden te compileren met behulp van de .NET Core SDK 3.0 of hoger met een taalversie van latest.

    Visual Studio-gebruikers die volledig willen profiteren van de C# 8.0-syntaxis, moeten Visual Studio 2019 of hoger gebruiken. Visual Studio 2019, met inbegrip van de gratis Community-editie, hier worden gedownload. Gebruikers van Visual Studio 2017 kunnen profiteren van de C# 8-syntaxis door gebruik te maken van het Microsoft.Net.Compilers NuGet-pakket en de taalversie in te stellen, hoewel de bewerkingservaring mogelijk niet ideaal is.

    U kunt de bibliotheek nog steeds gebruiken met eerdere C#-taalversies, maar u moet asynchrone opsommings- en asynchrone wegwerpleden handmatig beheren in plaats van te profiteren van de nieuwe syntaxis. U kunt zich nog steeds richten op elke frameworkversie die wordt ondersteund door uw .NET Core SDK, inclusief eerdere versies van .NET Core of het .NET Framework. Zie Doelframeworks opgeven voor meer informatie.

    Belangrijke opmerking: Als u de voorbeelden en voorbeelden wilt bouwen of uitvoeren zonder wijziging, is het gebruik van C# 11.0 nodig. U kunt de voorbeelden nog steeds uitvoeren als u besluit ze aan te passen voor andere taalversies.

Als u snel de benodigde resources in Azure wilt maken en hiervoor verbindingsreeksen wilt ontvangen, kunt u onze voorbeeldsjabloon implementeren door te klikken op:

Implementeren op Azure

Het pakket installeren

Installeer de Azure Event Hubs Event Processor-clientbibliotheek voor .NET met behulp van NuGet:

dotnet add package Azure.Messaging.EventHubs.Processor

De client verifiëren

Een Event Hubs-connection string verkrijgen

Als u wilt dat de Event Hubs-clientbibliotheek kan communiceren met een Event Hub, moet deze weten hoe u verbinding kunt maken en ermee kunt autoriseren. De eenvoudigste manier om dit te doen, is het gebruik van een connection string, die automatisch wordt gemaakt bij het maken van een Event Hubs-naamruimte. Als u niet bekend bent met het gebruik van verbindingsreeksen met Event Hubs, kunt u de stapsgewijze handleiding volgen om een Event Hubs-connection string op te halen.

Een Azure Storage-connection string verkrijgen

Als u wilt dat de gebeurtenisprocessorclient gebruik kan maken van Azure Storage-blobs voor controlepunten, moet deze weten hoe u verbinding maakt met een opslagaccount en hiermee autoriseert. De eenvoudigste methode hiervoor is het gebruik van een connection string, die wordt gegenereerd op het moment dat het opslagaccount wordt gemaakt. Als u niet bekend bent met het opslagaccount connection string autorisatie in Azure, kunt u de stapsgewijze handleiding voor het configureren van Azure Storage-verbindingsreeksen volgen.

Belangrijkste concepten

  • Een gebeurtenisprocessor is een constructie die is bedoeld voor het beheren van de verantwoordelijkheden die zijn gekoppeld aan het maken van verbinding met een bepaalde Event Hub en het verwerken van gebeurtenissen van elk van de partities, in de context van een specifieke consumentengroep. Het verwerken van gebeurtenissen die worden gelezen van de partitie en het verwerken van eventuele fouten die optreden, wordt door de gebeurtenisprocessor gedelegeerd aan de code die u opgeeft, zodat uw logica zich kan concentreren op het leveren van bedrijfswaarde terwijl de processor de taken afhandelt die zijn gekoppeld aan leesgebeurtenissen, het beheren van de partities en het behouden van de status in de vorm van controlepunten.

  • Controlepunten is een proces waarmee lezers hun positie markeren en behouden voor gebeurtenissen die zijn verwerkt voor een partitie. Controlepunten zijn de verantwoordelijkheid van de consument en vindt plaats per partitie, meestal in de context van een specifieke consumentengroep. Voor de EventProcessorClientbetekent dit dat voor een combinatie van consumentengroepen en partities de processor de huidige positie in de gebeurtenisstroom moet bijhouden. Als u meer informatie wilt, raadpleegt u controlepunten in de productdocumentatie van Event Hubs.

    Wanneer een gebeurtenisprocessor verbinding maakt, begint deze gebeurtenissen te lezen op het controlepunt dat eerder werd bewaard door de laatste processor van die partitie in die consumentengroep, indien aanwezig. Als een gebeurtenisprocessor gebeurtenissen in de partitie leest en erop reageert, moet deze periodiek controlepunten maken om de gebeurtenissen te markeren als 'voltooid' door downstreamtoepassingen en om tolerantie te bieden als een gebeurtenisprocessor of de omgeving die deze host. Indien nodig is het mogelijk om gebeurtenissen die eerder als 'voltooid' waren gemarkeerd, opnieuw te verwerken door een eerdere verschuiving op te geven via dit controlepuntproces.

  • Een partitie is een geordende reeks gebeurtenissen die wordt gehouden in een Event Hub. Partities zijn een methode voor gegevensorganisatie die is gekoppeld aan de parallelle uitvoering die is vereist door gebeurtenisgebruikers. Azure Event Hubs biedt berichtstreaming via een gepartitioneerd consumentenpatroon waarbij elke consument alleen een specifieke subset of partitie van de berichtenstroom leest. Als er nieuwere gebeurtenissen plaatsvinden, worden deze toegevoegd aan het einde van deze reeks. Het aantal partities wordt opgegeven op het moment dat een Event Hub wordt gemaakt en kan niet worden gewijzigd.

  • Een consumentengroep is een weergave van een hele Event Hub. Met consumentengroepen kunnen meerdere verbruikende toepassingen elk een afzonderlijke weergave van de gebeurtenisstroom hebben en de stream onafhankelijk in hun eigen tempo en vanuit hun eigen positie lezen. Er kunnen maximaal 5 gelijktijdige lezers op een partitie per consumentengroep zijn; Het wordt echter aanbevolen dat er slechts één actieve consument is voor een bepaalde koppeling tussen partities en consumentengroepen. Elke actieve lezer ontvangt alle gebeurtenissen van zijn partitie; Als er meerdere lezers op dezelfde partitie zijn, ontvangen ze dubbele gebeurtenissen.

Zie Functies van Event Hubs voor meer concepten en diepere discussies.

Clientlevensduur

De EventProcessorClient is veilig in de cache en te gebruiken als een singleton voor de levensduur van de toepassing. Dit is de best practice wanneer gebeurtenissen regelmatig worden gelezen. De clients zijn verantwoordelijk voor efficiënt beheer van het netwerk-, CPU- en geheugengebruik, waarbij het gebruik laag blijft tijdens perioden van inactiviteit. Het aanroepen StopProcessingAsync van of StopProcessing op de processor is vereist om ervoor te zorgen dat netwerkbronnen en andere onbeheerde objecten correct worden opgeschoond.

Veiligheid van schroefdraad

We garanderen dat alle clientexemplaarmethoden thread-veilig en onafhankelijk van elkaar zijn (richtlijn). Dit zorgt ervoor dat de aanbeveling om clientexemplaren opnieuw te gebruiken altijd veilig is, zelfs voor alle threads.

De gegevensmodeltypen, zoals EventData en EventDataBatch , zijn niet thread-veilig. Ze mogen niet worden gedeeld tussen threads en mogen niet gelijktijdig met clientmethoden worden gebruikt.

Aanvullende concepten

Clientopties | Gebeurtenishandlers | Afhandeling van fouten | Diagnostics | Mocking (processor) | Mocking (clienttypen)

Voorbeelden

Een gebeurtenisprocessorclient maken

Omdat de EventProcessorClient afhankelijk is van Azure Storage-blobs voor persistentie van de status, moet u een BlobContainerClient opgeven voor de processor, die is geconfigureerd voor het opslagaccount en de container die moeten worden gebruikt. De container die wordt gebruikt om de EventProcessorClient te configureren, moet bestaan.

Omdat de EventProcessorClient op geen enkele manier de intentie heeft om een container op te geven die niet bestaat, wordt de container niet impliciet gemaakt. Dit fungeert als bescherming tegen een onjuist geconfigureerde container die een frauduleuze processor veroorzaakt die het eigendom niet kan delen en andere processors in de consumentengroep verstoort.

// The container specified when creating the BlobContainerClient must exist; it will
// not be implicitly created.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

De gebeurtenis- en fout-handlers configureren

Als u de EventProcessorClientwilt gebruiken, moeten handlers voor gebeurtenisverwerking en fouten worden opgegeven. Deze handlers worden beschouwd als op zichzelf staand en ontwikkelaars zijn verantwoordelijk om ervoor te zorgen dat uitzonderingen in de handlercode worden verwerkt.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

async Task processEventHandler(ProcessEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an event.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheEvent(eventArgs.Partition, eventArgs.Data);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an error.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheError(eventArgs.Exception);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

Verwerking starten en stoppen

De EventProcessorClient voert de verwerking op de achtergrond uit zodra deze expliciet is gestart en blijft dit doen totdat het expliciet is gestopt. Hoewel de toepassingscode hierdoor andere taken kan uitvoeren, is het ook de verantwoordelijkheid om ervoor te zorgen dat het proces niet wordt beëindigd tijdens de verwerking als er geen andere taken worden uitgevoerd.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}

Een Active Directory-principal gebruiken met de gebeurtenisprocessorclient

De Azure Identity-bibliotheek biedt ondersteuning voor Azure Active Directory-verificatie die kan worden gebruikt voor de Azure-clientbibliotheken, waaronder Event Hubs en Azure Storage.

Als u gebruik wilt maken van een Active Directory-principal, wordt een van de beschikbare referenties uit de Azure.Identity bibliotheek opgegeven bij het maken van de Event Hubs-client. Daarnaast worden de volledig gekwalificeerde Event Hubs-naamruimte en de naam van de gewenste Event Hub opgegeven in plaats van de Event Hubs-connection string.

Als u gebruik wilt maken van een Active Directory-principal met Azure Storage-blobcontainers, moet de volledig gekwalificeerde URL naar de container worden opgegeven bij het maken van de opslagclient. Meer informatie over de geldige URI-indelingen voor toegang tot Blob Storage vindt u in Containers, blobs en metagegevens naam geven en hiernaar verwijzen.

var credential = new DefaultAzureCredential();
var blobStorageUrl ="<< FULLY-QUALIFIED CONTAINER URL (like https://myaccount.blob.core.windows.net/mycontainer) >>";

var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(new Uri(blobStorageUrl), credential);

var processor = new EventProcessorClient
(
    storageClient,
    consumerGroup,
    fullyQualifiedNamespace,
    eventHubName,
    credential
);

Wanneer u Azure Active Directory gebruikt met Event Hubs, moet aan uw principal een rol worden toegewezen die het lezen van Event Hubs mogelijk maakt, zoals de Azure Event Hubs Data Receiver rol. Raadpleeg de bijbehorende documentatie voor meer informatie over het gebruik van Azure Active Directory-autorisatie met Event Hubs.

Wanneer u Azure Active Directory gebruikt met Azure Storage, moet aan uw principal een rol worden toegewezen waarmee u toegang tot blobs kunt lezen, schrijven en verwijderen, zoals de Storage Blob Data Contributor rol. Raadpleeg de bijbehorende documentatie en het Azure Storage-autorisatievoorbeeld voor meer informatie over het gebruik van Active Directory-autorisatie met Azure Storage.

Problemen oplossen

Raadpleeg de Handleiding voor het oplossen van problemen met Event Hubs voor gedetailleerde informatie over het oplossen van problemen.

Afhandeling van uitzonderingen

Gebeurtenisprocessorclientuitzondering

De gebeurtenisprocessorclient doet elke poging om tolerant te zijn in het geval van uitzonderingen en zal de nodige acties ondernemen om de verwerking voort te zetten, tenzij dit onmogelijk is. Er is geen actie van ontwikkelaars nodig om dit te laten plaatsvinden; het maakt standaard deel uit van het gedrag van de processor.

Om ontwikkelaars de mogelijkheid te bieden om uitzonderingen te inspecteren en erop te reageren die zich voordoen binnen de clientbewerkingen van de gebeurtenisprocessor, worden ze via de ProcessError gebeurtenis weergegeven. De argumenten voor deze gebeurtenis bieden details over de uitzondering en de context waarin deze is waargenomen. Ontwikkelaars kunnen normale bewerkingen uitvoeren op de gebeurtenisprocessorclient vanuit deze gebeurtenis-handler, zoals stoppen en/of opnieuw starten als reactie op fouten, maar hebben mogelijk geen andere invloed op het uitzonderingsgedrag van de processor.

Voor een eenvoudig voorbeeld van het implementeren van de fouthandler raadpleegt u het voorbeeld: Gebeurtenisprocessorhandlers.

Uitzonderingen in gebeurtenis-handlers

Omdat de Event Processor-client niet de juiste context heeft om de ernst van uitzonderingen te begrijpen binnen de gebeurtenis-handlers die ontwikkelaars bieden, kan deze niet aannemen welke acties een redelijke reactie hierop zouden zijn. Als gevolg hiervan worden ontwikkelaars beschouwd als verantwoordelijk voor uitzonderingen die optreden binnen de gebeurtenis-handlers die ze bieden met behulp van try/catch blokken en andere standaardtaalconstructies.

De Event Processor-client probeert geen uitzonderingen in ontwikkelaarscode te detecteren en deze ook niet expliciet aan te brengen. Het resulterende gedrag is afhankelijk van de hostingomgeving van de processor en de context waarin de gebeurtenis-handler is aangeroepen. Omdat dit kan variëren tussen verschillende scenario's, is het raadzaam dat ontwikkelaars hun gebeurtenis-handlers defensief codeert en rekening houdt met mogelijke uitzonderingen.

Logboekregistratie en diagnostische gegevens

De Event Processor-clientbibliotheek is volledig geïnstrueerd voor het vastleggen van gegevens op verschillende detailniveaus met behulp van .NET EventSource om informatie te verzenden. Logboekregistratie wordt uitgevoerd voor elke bewerking en volgt het patroon van het markeren van het beginpunt van de bewerking, de voltooiing ervan en eventuele uitzonderingen die zijn opgetreden. Aanvullende informatie die mogelijk inzicht biedt, wordt ook vastgelegd in de context van de bijbehorende bewerking.

De logboeken van de Event Processor-client zijn beschikbaar voor iedereen EventListener door u aan te melden voor de bron met de naam 'Azure-Messaging-EventHubs-Processor-EventProcessorClient' of door u aan te melden voor alle bronnen die de eigenschap AzureEventSource hebben. Om het vastleggen van logboeken uit de Azure-clientbibliotheken gemakkelijker te maken, biedt de Azure.Core bibliotheek die wordt gebruikt door Event Hubs een AzureEventSourceListener. Meer informatie vindt u in Event Hubs-logboeken vastleggen met behulp van de AzureEventSourceListener.

De bibliotheek gebeurtenisprocessor is ook geïnstrueerd voor gedistribueerde tracering met behulp van Application Insights of OpenTelemetry. Meer informatie vindt u in het azure.Core Diagnostics-voorbeeld.

Volgende stappen

Naast de besproken scenario's biedt de bibliotheek Azure Event Hubs Processor ondersteuning voor aanvullende scenario's om te profiteren van de volledige functieset van de EventProcessorClient. Om een aantal van deze scenario's te verkennen, biedt de clientbibliotheek van Event Hubs Processor een project met voorbeelden als illustratie voor veelvoorkomende scenario's. Raadpleeg de voorbeelden LEESMIJ voor meer informatie.

Bijdragen

Wij verwelkomen bijdragen en suggesties voor dit project. Voor de meeste bijdragen moet u instemmen met een licentieovereenkomst voor bijdragers (CLA: Contributor License Agreement) waarin u verklaart dat u gerechtigd bent ons het recht te geven uw bijdrage te gebruiken, en dat u dit ook doet. Ga naar https://cla.microsoft.com voor meer informatie.

Wanneer u een pull-aanvraag indient, wordt met een CLA-bot automatisch bepaald of u een CLA moet verschaffen en wordt de pull-aanvraag dienovereenkomstig opgemaakt (bijvoorbeeld met een label of commentaar). Volg gewoon de instructies van de bot. U hoeft dit maar eenmaal te doen voor alle repo's waar gebruik wordt gemaakt van onze CLA.

Op dit project is de Microsoft Open Source Code of Conduct (Microsoft Open Source-gedragscode) van toepassing. Zie de Veelgestelde vragen over de gedragscode voor meer informatie of neem contact op opencode@microsoft.com met eventuele aanvullende vragen of opmerkingen.

Raadpleeg onze gids voor bijdragen voor meer informatie.

Weergaven