Verifiera med hjälp av ett Avro-schema vid strömning av händelser med hjälp av Event Hubs .NET SDK:er (AMQP)
I den här snabbstarten får du lära dig hur du skickar händelser till och tar emot händelser från en händelsehubb med schemavalidering med hjälp av Azure.Messaging.EventHubs .NET-biblioteket.
Anteckning
Azure Schema Registry är en funktion i Event Hubs, som tillhandahåller en central lagringsplats för scheman för händelsedrivna och meddelandecentrerade program. Det ger flexibiliteten för dina producent- och konsumentprogram att utbyta data utan att behöva hantera och dela schemat. Den tillhandahåller också ett enkelt styrningsramverk för återanvändbara scheman och definierar relationen mellan scheman via en grupperingskonstruktion (schemagrupper). Mer information finns i Azure Schema Registry i Event Hubs.
Förutsättningar
Om du inte har Azure Event Hubs kan du läsa Översikt över Event Hubs innan du gör den här snabbstarten.
För att slutföra den här snabbstarten, behöver du följande förhandskrav:
- Om du inte har en Azure-prenumeration skapar du ett kostnadsfritt konto innan du börjar.
-
Microsoft Visual Studio 2022.
Azure Event Hubs-klientbiblioteket använder nya funktioner som introducerades i C# 8.0. Du kan fortfarande använda biblioteket med tidigare C#-språkversioner, men den nya syntaxen är inte tillgänglig. Om du vill använda den fullständiga syntaxen rekommenderar vi att du kompilerar med .NET Core SDK 3.0 eller senare och att språkversionen är inställd på
latest
. Om du använder Visual Studio är versioner innan Visual Studio 2019 inte kompatibla med de verktyg som behövs för att skapa C# 8.0-projekt. Visual Studio 2019, inklusive den kostnadsfria Community Edition, kan laddas ned här.
Skapa en händelsehubb
Följ anvisningarna i snabbstarten: Skapa ett Event Hubs-namnområde och en händelsehubb för att skapa en Event Hubs-namnrymd och en händelsehubb. Följ sedan anvisningarna från Hämta anslutningssträngen för att hämta en anslutningssträng till Event Hubs-namnområdet.
Anteckna följande inställningar som du använder i den aktuella snabbstarten:
- Anslutningssträng för Event Hubs-namnområdet
- Namn på händelsehubben
Skapa ett schema
Följ anvisningarna i Skapa scheman med schemaregistret för att skapa en schemagrupp och ett schema.
Skapa en schemagrupp med namnet contoso-sg med hjälp av Schema Registry-portalen. Använd Avro som serialiseringstyp och Ingen för kompatibilitetsläget.
I den schemagruppen skapar du ett nytt Avro-schema med schemanamn:
Microsoft.Azure.Data.SchemaRegistry.example.Order
med hjälp av följande schemainnehåll.{ "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", "type": "record", "name": "Order", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "description", "type": "string" } ] }
Lägg till användare i rollen Schema Registry Reader
Lägg till ditt användarkonto i rollen Schema Registry Reader på namnområdesnivå. Du kan också använda rollen Schema Registry-deltagare , men det är inte nödvändigt för den här snabbstarten.
- På sidan Event Hubs-namnområde väljer du Åtkomstkontroll (IAM) på den vänstra menyn.
- På sidan Åtkomstkontroll (IAM) väljer du + Lägg till ->Lägg till rolltilldelning på menyn.
- På sidan Tilldelningstyp väljer du Nästa.
- På sidan Roller väljer du Schema Registry Reader (förhandsversion) och sedan Nästa längst ned på sidan.
- Använd länken + Välj medlemmar för att lägga till ditt användarkonto i rollen och välj sedan Nästa.
- På sidan Granska + tilldela väljer du Granska + tilldela.
Skapa händelser till händelsehubbar med schemavalidering
Skapa konsolprogram för händelseproducent
- Starta Visual Studio 2019.
- Välj Skapa ett nytt projekt.
- I dialogrutan Skapa ett nytt projekt gör du följande: Om du inte ser den här dialogrutan väljer du Arkiv på menyn, väljer Nytt och sedan Projekt.
Välj C# som programmeringsspråk.
Välj Konsol för programmets typ.
Välj Konsolprogram i resultatlistan.
Välj Nästa.
- Ange OrderProducer som projektnamn, SRQuickStart som lösningsnamn och välj sedan OK för att skapa projektet.
Lägga till Event Hubs NuGet-paketet
Välj Verktyg>NuGet Package Manager Package Manager>Console på menyn.
Kör följande kommandon för att installera Azure.Messaging.EventHubs och andra NuGet-paket. Tryck på RETUR för att köra det sista kommandot.
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
Autentisera producentprogram för att ansluta till Azure via Visual Studio enligt nedan.
Logga in på Azure med det användarkonto som är medlem
Schema Registry Reader
i rollen på namnområdesnivå. Information om schemaregisterroller finns i Azure Schema Registry i Event Hubs.
Kodgenerering med Hjälp av Avro-schemat
- Använd samma innehåll som du använde för att skapa schemat för att skapa en fil med namnet
Order.avsc
. Spara filen i projekt- eller lösningsmappen. - Sedan kan du använda den här schemafilen för att generera kod för .NET. Du kan använda alla externa kodgenereringsverktyg, till exempel avrogen för kodgenerering. Du kan till exempel köra
avrogen -s .\Order.avsc .
för att generera kod. - När du har genererat kod visas filen med namnet
Order.cs
i\Microsoft\Azure\Data\SchemaRegistry\example
mappen . För avro-schemat ovan genererar det C#-typerna iMicrosoft.Azure.Data.SchemaRegistry.example
namnområdet. -
Order.cs
Lägg till filen iOrderProducer
projektet.
Skriva kod för att serialisera och skicka händelser till händelsehubben
Lägg till följande kod i
Program.cs
-filen: Mer information finns i kodkommentarna. Övergripande steg i koden är:- Skapa en producentklient som du kan använda för att skicka händelser till en händelsehubb.
- Skapa en schemaregisterklient som du kan använda för att serialisera och verifiera data i ett
Order
objekt. - Skapa ett nytt
Order
objekt med den genereradeOrder
typen. - Använd schemaregisterklienten för att serialisera objektet
Order
tillEventData
. - Skapa en batch med händelser.
- Lägg till händelsedata i händelsebatchen.
- Använd producentklienten för att skicka batchen med händelser till händelsehubben.
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // The Event Hubs client types are safe to cache and use as a singleton for the lifetime // of the application, which is best practice when events are being published or read regularly. EventHubProducerClient producerClient; // Create a producer client that you can use to send events to an event hub producerClient = new EventHubProducerClient(connectionString, eventHubName); // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Create a new order object using the generated type/class 'Order'. var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." }; EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData)); // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add the event data to the event batch. eventBatch.TryAdd(eventData); // Send the batch of events to the event hub. await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 1 order has been published.");
Ersätt följande platshållarvärden med de verkliga värdena.
-
EVENTHUBSNAMESPACECONNECTIONSTRING
– anslutningssträng för Event Hubs-namnområdet -
EVENTHUBNAME
– namnet på händelsehubben -
EVENTHUBSNAMESPACENAME
– namn på Event Hubs-namnområdet -
SCHEMAGROUPNAME
- schemagruppens namn
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME";
-
Skapa projektet och se till att det inte finns några fel.
Kör programmet och vänta på bekräftelsemeddelandet.
A batch of 1 order has been published.
I Azure Portal kan du kontrollera att händelsehubben har tagit emot händelserna. Växla till meddelandevyn i avsnittet Mått . Uppdatera sidan för att uppdatera diagrammet. Det kan ta några sekunder för det att visa att meddelandena har tagits emot.
Använda händelser från händelsehubbar med schemavalidering
Det här avsnittet visar hur du skriver ett .NET Core-konsolprogram som tar emot händelser från en händelsehubb och använder schemaregistret för att deserialisera händelsedata.
Ytterligare krav
- Skapa lagringskontot som ska användas av händelseprocessorn.
Skapa konsumentprogram
- I fönstret Solution Explorer högerklickar du på SRQuickStart-lösningen, pekar på Lägg till och väljer Nytt projekt.
- Välj Konsolprogram och välj Nästa.
- Ange OrderConsumer som Projektnamn och välj Skapa.
- I fönstret Solution Explorer högerklickar du på OrderConsumer och väljer Ange som ett startprojekt.
Lägga till Event Hubs NuGet-paketet
Välj Verktyg>NuGet Package Manager Package Manager>Console på menyn.
I fönstret Package Manager Console (Pakethanterarens konsol ) bekräftar du att OrderConsumer har valts för standardprojektet. Annars använder du listrutan för att välja OrderConsumer.
Kör följande kommando för att installera nödvändiga NuGet-paket. Tryck på RETUR för att köra det sista kommandot.
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Messaging.EventHubs.Processor Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
Autentisera producentprogram för att ansluta till Azure via Visual Studio enligt nedan.
Logga in på Azure med det användarkonto som är medlem
Schema Registry Reader
i rollen på namnområdesnivå. Information om schemaregisterroller finns i Azure Schema Registry i Event Hubs.Order.cs
Lägg till filen som du genererade som en del av skapandet av producentappen i Projektet OrderConsumer.Högerklicka på OrderConsumer-projektet och välj Ange som startprojekt.
Skriva kod för att ta emot händelser och deserialisera dem med schemaregistret
Lägg till följande kod i
Program.cs
-filen: Mer information finns i kodkommentarna. Övergripande steg i koden är:- Skapa en konsumentklient som du kan använda för att skicka händelser till en händelsehubb.
- Skapa en blobcontainerklient för blobcontainern i Azure Blob Storage.
- Skapa en händelseprocessorklient och registrera händelse- och felhanterare.
- I händelsehanteraren skapar du en schemaregisterklient som du kan använda för att deserialisera händelsedata till ett
Order
objekt. - Deserialisera händelsedata till ett
Order
objekt med serialiseraren. - Skriv ut informationen om den mottagna ordern.
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // connection string for the Azure Storage account const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // name of the blob container that will be used as a checkpoint store const string blobContainerName = "BLOBCONTAINERNAME"; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 30 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(30)); // Stop the processing await processor.StopProcessingAsync(); static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Deserialized data in the received event using the schema Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order)); // Print the received event Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}"); await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
Ersätt följande platshållarvärden med de verkliga värdena.
-
EVENTHUBSNAMESPACE-CONNECTIONSTRING
– anslutningssträng för Event Hubs-namnområdet -
EVENTHUBNAME
– namnet på händelsehubben -
EVENTHUBSNAMESPACENAME
– namn på Event Hubs-namnområdet -
SCHEMAGROUPNAME
- schemagruppens namn -
AZURESTORAGECONNECTIONSTRING
– anslutningssträng för Azure Storage-kontot -
BLOBCONTAINERNAME
– Namnet på blobcontainern
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // Azure storage connection string const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // Azure blob container name const string blobContainerName = "BLOBCONTAINERNAME";
-
Skapa projektet och se till att det inte finns några fel.
Kör mottagarprogrammet.
Du bör se ett meddelande om att händelserna har tagits emot.
Received order with ID: 1234, amount: 45.29, description: First sample order.
Dessa händelser är de tre händelser som du skickade till händelsehubben tidigare genom att köra avsändarprogrammet.
Exempel
Se Readme-artikeln på vår GitHub-lagringsplats.
Rensa resurser
Ta bort Event Hubs-namnområdet eller ta bort den resursgrupp som innehåller namnområdet.