Dela via


Verifiera med hjälp av ett Avro-schema vid strömning av händelser med hjälp av Event Hubs .NET SDK:er (AMQP)

I den här snabbstarten får du lära dig hur du skickar händelser till och tar emot händelser från en händelsehubb med schemavalidering med hjälp av Azure.Messaging.EventHubs .NET-biblioteket.

Anteckning

Azure Schema Registry är en funktion i Event Hubs, som tillhandahåller en central lagringsplats för scheman för händelsedrivna och meddelandecentrerade program. Det ger flexibiliteten för dina producent- och konsumentprogram att utbyta data utan att behöva hantera och dela schemat. Den tillhandahåller också ett enkelt styrningsramverk för återanvändbara scheman och definierar relationen mellan scheman via en grupperingskonstruktion (schemagrupper). Mer information finns i Azure Schema Registry i Event Hubs.

Förutsättningar

Om du inte har Azure Event Hubs kan du läsa Översikt över Event Hubs innan du gör den här snabbstarten.

För att slutföra den här snabbstarten, behöver du följande förhandskrav:

  • Om du inte har en Azure-prenumeration skapar du ett kostnadsfritt konto innan du börjar.
  • Microsoft Visual Studio 2022. Azure Event Hubs-klientbiblioteket använder nya funktioner som introducerades i C# 8.0. Du kan fortfarande använda biblioteket med tidigare C#-språkversioner, men den nya syntaxen är inte tillgänglig. Om du vill använda den fullständiga syntaxen rekommenderar vi att du kompilerar med .NET Core SDK 3.0 eller senare och att språkversionen är inställd på latest. Om du använder Visual Studio är versioner innan Visual Studio 2019 inte kompatibla med de verktyg som behövs för att skapa C# 8.0-projekt. Visual Studio 2019, inklusive den kostnadsfria Community Edition, kan laddas ned här.

Skapa en händelsehubb

Följ anvisningarna i snabbstarten: Skapa ett Event Hubs-namnområde och en händelsehubb för att skapa en Event Hubs-namnrymd och en händelsehubb. Följ sedan anvisningarna från Hämta anslutningssträngen för att hämta en anslutningssträng till Event Hubs-namnområdet.

Anteckna följande inställningar som du använder i den aktuella snabbstarten:

  • Anslutningssträng för Event Hubs-namnområdet
  • Namn på händelsehubben

Skapa ett schema

Följ anvisningarna i Skapa scheman med schemaregistret för att skapa en schemagrupp och ett schema.

  1. Skapa en schemagrupp med namnet contoso-sg med hjälp av Schema Registry-portalen. Använd Avro som serialiseringstyp och Ingen för kompatibilitetsläget.

  2. I den schemagruppen skapar du ett nytt Avro-schema med schemanamn: Microsoft.Azure.Data.SchemaRegistry.example.Order med hjälp av följande schemainnehåll.

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

Lägg till användare i rollen Schema Registry Reader

Lägg till ditt användarkonto i rollen Schema Registry Reader på namnområdesnivå. Du kan också använda rollen Schema Registry-deltagare , men det är inte nödvändigt för den här snabbstarten.

  1. På sidan Event Hubs-namnområde väljer du Åtkomstkontroll (IAM) på den vänstra menyn.
  2. På sidan Åtkomstkontroll (IAM) väljer du + Lägg till ->Lägg till rolltilldelning på menyn.
  3. På sidan Tilldelningstyp väljer du Nästa.
  4. På sidan Roller väljer du Schema Registry Reader (förhandsversion) och sedan Nästa längst ned på sidan.
  5. Använd länken + Välj medlemmar för att lägga till ditt användarkonto i rollen och välj sedan Nästa.
  6. På sidan Granska + tilldela väljer du Granska + tilldela.

Skapa händelser till händelsehubbar med schemavalidering

Skapa konsolprogram för händelseproducent

  1. Starta Visual Studio 2019.
  2. Välj Skapa ett nytt projekt.
  3. I dialogrutan Skapa ett nytt projekt gör du följande: Om du inte ser den här dialogrutan väljer du Arkiv på menyn, väljer Nytt och sedan Projekt.
    1. Välj C# som programmeringsspråk.

    2. Välj Konsol för programmets typ.

    3. Välj Konsolprogram i resultatlistan.

    4. Välj Nästa.

      Bild som visar dialogrutan Nytt projekt.

  4. Ange OrderProducer som projektnamn, SRQuickStart som lösningsnamn och välj sedan OK för att skapa projektet.

Lägga till Event Hubs NuGet-paketet

  1. Välj Verktyg>NuGet Package Manager Package Manager>Console på menyn.

  2. Kör följande kommandon för att installera Azure.Messaging.EventHubs och andra NuGet-paket. Tryck på RETUR för att köra det sista kommandot.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. Autentisera producentprogram för att ansluta till Azure via Visual Studio enligt nedan.

  4. Logga in på Azure med det användarkonto som är medlem Schema Registry Reader i rollen på namnområdesnivå. Information om schemaregisterroller finns i Azure Schema Registry i Event Hubs.

Kodgenerering med Hjälp av Avro-schemat

  1. Använd samma innehåll som du använde för att skapa schemat för att skapa en fil med namnet Order.avsc. Spara filen i projekt- eller lösningsmappen.
  2. Sedan kan du använda den här schemafilen för att generera kod för .NET. Du kan använda alla externa kodgenereringsverktyg, till exempel avrogen för kodgenerering. Du kan till exempel köra avrogen -s .\Order.avsc . för att generera kod.
  3. När du har genererat kod visas filen med namnet Order.cs i \Microsoft\Azure\Data\SchemaRegistry\example mappen . För avro-schemat ovan genererar det C#-typerna i Microsoft.Azure.Data.SchemaRegistry.example namnområdet.
  4. Order.cs Lägg till filen i OrderProducer projektet.

Skriva kod för att serialisera och skicka händelser till händelsehubben

  1. Lägg till följande kod i Program.cs-filen: Mer information finns i kodkommentarna. Övergripande steg i koden är:

    1. Skapa en producentklient som du kan använda för att skicka händelser till en händelsehubb.
    2. Skapa en schemaregisterklient som du kan använda för att serialisera och verifiera data i ett Order objekt.
    3. Skapa ett nytt Order objekt med den genererade Order typen.
    4. Använd schemaregisterklienten för att serialisera objektet Order till EventData.
    5. Skapa en batch med händelser.
    6. Lägg till händelsedata i händelsebatchen.
    7. Använd producentklienten för att skicka batchen med händelser till händelsehubben.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. Ersätt följande platshållarvärden med de verkliga värdena.

    • EVENTHUBSNAMESPACECONNECTIONSTRING – anslutningssträng för Event Hubs-namnområdet
    • EVENTHUBNAME – namnet på händelsehubben
    • EVENTHUBSNAMESPACENAME – namn på Event Hubs-namnområdet
    • SCHEMAGROUPNAME - schemagruppens namn
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. Skapa projektet och se till att det inte finns några fel.

  4. Kör programmet och vänta på bekräftelsemeddelandet.

    A batch of 1 order has been published.
    
  5. I Azure Portal kan du kontrollera att händelsehubben har tagit emot händelserna. Växla till meddelandevyn i avsnittet Mått . Uppdatera sidan för att uppdatera diagrammet. Det kan ta några sekunder för det att visa att meddelandena har tagits emot.

    Bild av Azure Portal-sidan för att verifiera att händelsehubben tog emot händelserna.

Använda händelser från händelsehubbar med schemavalidering

Det här avsnittet visar hur du skriver ett .NET Core-konsolprogram som tar emot händelser från en händelsehubb och använder schemaregistret för att deserialisera händelsedata.

Ytterligare krav

  • Skapa lagringskontot som ska användas av händelseprocessorn.

Skapa konsumentprogram

  1. I fönstret Solution Explorer högerklickar du på SRQuickStart-lösningen, pekar på Lägg till och väljer Nytt projekt.
  2. Välj Konsolprogram och välj Nästa.
  3. Ange OrderConsumer som Projektnamn och välj Skapa.
  4. I fönstret Solution Explorer högerklickar du på OrderConsumer och väljer Ange som ett startprojekt.

Lägga till Event Hubs NuGet-paketet

  1. Välj Verktyg>NuGet Package Manager Package Manager>Console på menyn.

  2. I fönstret Package Manager Console (Pakethanterarens konsol ) bekräftar du att OrderConsumer har valts för standardprojektet. Annars använder du listrutan för att välja OrderConsumer.

  3. Kör följande kommando för att installera nödvändiga NuGet-paket. Tryck på RETUR för att köra det sista kommandot.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. Autentisera producentprogram för att ansluta till Azure via Visual Studio enligt nedan.

  5. Logga in på Azure med det användarkonto som är medlem Schema Registry Reader i rollen på namnområdesnivå. Information om schemaregisterroller finns i Azure Schema Registry i Event Hubs.

  6. Order.cs Lägg till filen som du genererade som en del av skapandet av producentappen i Projektet OrderConsumer.

  7. Högerklicka på OrderConsumer-projektet och välj Ange som startprojekt.

Skriva kod för att ta emot händelser och deserialisera dem med schemaregistret

  1. Lägg till följande kod i Program.cs-filen: Mer information finns i kodkommentarna. Övergripande steg i koden är:

    1. Skapa en konsumentklient som du kan använda för att skicka händelser till en händelsehubb.
    2. Skapa en blobcontainerklient för blobcontainern i Azure Blob Storage.
    3. Skapa en händelseprocessorklient och registrera händelse- och felhanterare.
    4. I händelsehanteraren skapar du en schemaregisterklient som du kan använda för att deserialisera händelsedata till ett Order objekt.
    5. Deserialisera händelsedata till ett Order objekt med serialiseraren.
    6. Skriv ut informationen om den mottagna ordern.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be used as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. Ersätt följande platshållarvärden med de verkliga värdena.

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING – anslutningssträng för Event Hubs-namnområdet
    • EVENTHUBNAME – namnet på händelsehubben
    • EVENTHUBSNAMESPACENAME – namn på Event Hubs-namnområdet
    • SCHEMAGROUPNAME - schemagruppens namn
    • AZURESTORAGECONNECTIONSTRING – anslutningssträng för Azure Storage-kontot
    • BLOBCONTAINERNAME – Namnet på blobcontainern
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. Skapa projektet och se till att det inte finns några fel.

  4. Kör mottagarprogrammet.

  5. Du bör se ett meddelande om att händelserna har tagits emot.

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    Dessa händelser är de tre händelser som du skickade till händelsehubben tidigare genom att köra avsändarprogrammet.

Exempel

Se Readme-artikeln på vår GitHub-lagringsplats.

Rensa resurser

Ta bort Event Hubs-namnområdet eller ta bort den resursgrupp som innehåller namnområdet.

Nästa steg