Udostępnij za pośrednictwem


Weryfikowanie przy użyciu schematu Avro podczas przesyłania strumieniowego zdarzeń przy użyciu zestawów SDK platformy .NET usługi Event Hubs (AMQP)

Z tego przewodnika Szybki start dowiesz się, jak wysyłać zdarzenia do centrum zdarzeń i odbierać je z centrum zdarzeń przy użyciu weryfikacji schematu przy użyciu biblioteki Azure.Messaging.EventHubs .NET.

Uwaga

Usługa Azure Schema Registry to funkcja usługi Event Hubs, która udostępnia centralne repozytorium schematów dla aplikacji opartych na zdarzeniach i komunikatach. Zapewnia ona elastyczność wymiany danych przez aplikacje producentów i konsumentów bez konieczności zarządzania schematem i udostępniania go. Zapewnia również prostą strukturę ładu dla schematów wielokrotnego użytku i definiuje relację między schematami za pomocą konstrukcji grupowania (grup schematów). Aby uzyskać więcej informacji, zobacz Rejestr schematów platformy Azure w usłudze Event Hubs.

Wymagania wstępne

Jeśli jesteś nowym użytkownikem Azure Event Hubs, zobacz Omówienie usługi Event Hubs przed wykonaniem tego przewodnika Szybki start.

Do wykonania kroków tego przewodnika Szybki start niezbędne jest spełnienie następujących wymagań wstępnych:

  • Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto .
  • Microsoft Visual Studio 2022. Biblioteka klienta Azure Event Hubs korzysta z nowych funkcji, które zostały wprowadzone w języku C# 8.0. Nadal można używać biblioteki z poprzednimi wersjami języka C#, ale nowa składnia nie jest dostępna. Aby korzystać z pełnej składni, zalecamy skompilowanie przy użyciu zestawu .NET Core SDK 3.0 lub nowszej i wersji językowej ustawionej na latestwartość . Jeśli używasz programu Visual Studio, wersje przed programem Visual Studio 2019 nie są zgodne z narzędziami wymaganymi do kompilowania projektów w języku C# 8.0. Program Visual Studio 2019, w tym bezpłatna wersja Community, można pobrać tutaj.

Tworzenie centrum zdarzeń

Postępuj zgodnie z instrukcjami z przewodnika Szybki start: tworzenie przestrzeni nazw usługi Event Hubs i centrum zdarzeń w celu utworzenia przestrzeni nazw usługi Event Hubs i centrum zdarzeń. Następnie postępuj zgodnie z instrukcjami z sekcji Pobieranie parametrów połączenia , aby pobrać parametry połączenia do przestrzeni nazw usługi Event Hubs.

Zanotuj następujące ustawienia, których użyjesz w bieżącym przewodniku Szybki start:

  • Parametry połączenia dla przestrzeni nazw usługi Event Hubs
  • Nazwa centrum zdarzeń

Tworzenie schematu

Postępuj zgodnie z instrukcjami w temacie Tworzenie schematów przy użyciu rejestru schematów , aby utworzyć grupę schematów i schemat.

  1. Utwórz grupę schematów o nazwie contoso-sg przy użyciu portalu rejestru schematów. Użyj avro jako typu serializacji i None dla trybu zgodności.

  2. W tej grupie schematów utwórz nowy schemat Avro o nazwie schematu: Microsoft.Azure.Data.SchemaRegistry.example.Order przy użyciu następującej zawartości schematu.

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

Dodawanie użytkownika do roli Czytelnik rejestru schematów

Dodaj konto użytkownika do roli Czytelnik rejestru schematów na poziomie przestrzeni nazw. Możesz również użyć roli Współautor rejestru schematów , ale nie jest to konieczne w tym przewodniku Szybki start.

  1. Na stronie Przestrzeń nazw usługi Event Hubs wybierz pozycję Kontrola dostępu (Zarządzanie dostępem i tożsamościami) w menu po lewej stronie.
  2. Na stronie Kontrola dostępu (zarządzanie dostępem i tożsamościami) wybierz pozycję + Dodaj ->Dodaj przypisanie roli w menu.
  3. Na stronie Typ przypisania wybierz pozycję Dalej.
  4. Na stronie Role wybierz pozycję Czytelnik rejestru schematów (wersja zapoznawcza), a następnie wybierz pozycję Dalej w dolnej części strony.
  5. Użyj linku + Wybierz członków , aby dodać konto użytkownika do roli, a następnie wybierz pozycję Dalej.
  6. Na stronie Przeglądanie i przypisywanie wybierz pozycję Przejrzyj i przypisz.

Tworzenie zdarzeń w centrach zdarzeń przy użyciu weryfikacji schematu

Tworzenie aplikacji konsolowej dla producenta zdarzeń

  1. Uruchom program Visual Studio 2019.
  2. Wybierz pozycję Utwórz nowy projekt.
  3. W oknie dialogowym Tworzenie nowego projektu wykonaj następujące kroki: Jeśli to okno dialogowe nie jest widoczne, wybierz pozycję Plik w menu, wybierz pozycję Nowy, a następnie wybierz pozycję Projekt.
    1. Wybierz język C# dla języka programowania.

    2. Wybierz pozycję Konsola dla typu aplikacji.

    3. Wybierz pozycję Aplikacja konsolowa z listy wyników.

    4. Następnie wybierz pozycję Dalej.

      Obraz przedstawiający okno dialogowe Nowy projekt.

  4. Wprowadź wartość OrderProducer jako nazwę projektu SRQuickStart jako nazwę rozwiązania, a następnie wybierz przycisk OK , aby utworzyć projekt.

Dodawanie pakietu NuGet usługi Event Hubs

  1. Wybierz pozycję Narzędzia>Konsola menedżera pakietówNuGet Package Manager> w menu.

  2. Uruchom następujące polecenia, aby zainstalować usługę Azure.Messaging.EventHubs i inne pakiety NuGet. Naciśnij klawisz ENTER , aby uruchomić ostatnie polecenie.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. Uwierzytelnianie aplikacji producentów w celu nawiązania połączenia z platformą Azure za pośrednictwem programu Visual Studio, jak pokazano tutaj.

  4. Zaloguj się do platformy Azure przy użyciu konta użytkownika, które jest członkiem Schema Registry Reader roli na poziomie przestrzeni nazw. Aby uzyskać informacje o rolach rejestru schematów, zobacz Rejestr schematów platformy Azure w usłudze Event Hubs.

Generowanie kodu przy użyciu schematu Avro

  1. Użyj tej samej zawartości, która została użyta do utworzenia schematu w celu utworzenia pliku o nazwie Order.avsc. Zapisz plik w folderze projektu lub rozwiązania.
  2. Następnie możesz użyć tego pliku schematu do wygenerowania kodu dla platformy .NET. Do generowania kodu można użyć dowolnego narzędzia do generowania kodu zewnętrznego, takiego jak avrogen . Na przykład możesz uruchomić polecenie avrogen -s .\Order.avsc . w celu wygenerowania kodu.
  3. Po wygenerowaniu kodu zobaczysz plik o nazwie Order.cs w folderze \Microsoft\Azure\Data\SchemaRegistry\example . Dla powyższego schematu Avro generuje typy języka C# w Microsoft.Azure.Data.SchemaRegistry.example przestrzeni nazw.
  4. Order.cs Dodaj plik do OrderProducer projektu.

Pisanie kodu w celu serializacji i wysyłania zdarzeń do centrum zdarzeń

  1. Dodaj następujący kod do pliku Program.cs. Zobacz komentarze kodu, aby uzyskać szczegółowe informacje. Ogólne kroki w kodzie są następujące:

    1. Utwórz klienta producenta, którego można użyć do wysyłania zdarzeń do centrum zdarzeń.
    2. Utwórz klienta rejestru schematów, którego można użyć do serializacji i weryfikowania danych w Order obiekcie.
    3. Utwórz nowy Order obiekt przy użyciu wygenerowanego Order typu.
    4. Użyj klienta rejestru schematów, aby serializować Order obiekt na EventData.
    5. Utwórz partię zdarzeń.
    6. Dodaj dane zdarzenia do partii zdarzeń.
    7. Użyj klienta producenta, aby wysłać partię zdarzeń do centrum zdarzeń.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. Zastąp następujące wartości zastępcze rzeczywistymi wartościami.

    • EVENTHUBSNAMESPACECONNECTIONSTRING — parametry połączenia dla przestrzeni nazw usługi Event Hubs
    • EVENTHUBNAME — nazwa centrum zdarzeń
    • EVENTHUBSNAMESPACENAME — nazwa przestrzeni nazw usługi Event Hubs
    • SCHEMAGROUPNAME - nazwa grupy schematów
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. Skompiluj projekt i upewnij się, że nie ma żadnych błędów.

  4. Uruchom program i poczekaj na komunikat potwierdzenia.

    A batch of 1 order has been published.
    
  5. W Azure Portal możesz sprawdzić, czy centrum zdarzeń odebrało zdarzenia. Przejdź do widoku Komunikaty w sekcji Metryki . Odśwież stronę, aby zaktualizować wykres. Wyświetlenie komunikatów odebranych może potrwać kilka sekund.

    Obraz przedstawiający stronę Azure Portal w celu sprawdzenia, czy centrum zdarzeń odebrało zdarzenia.

Korzystanie ze zdarzeń z centrów zdarzeń z weryfikacją schematu

W tej sekcji pokazano, jak napisać aplikację konsolową platformy .NET Core, która odbiera zdarzenia z centrum zdarzeń i używa rejestru schematów do deserializacji danych zdarzeń.

Dodatkowe wymagania wstępne

  • Utwórz konto magazynu do użycia z procesorem zdarzeń.

Tworzenie aplikacji dla konsumentów

  1. W oknie Eksplorator rozwiązań kliknij prawym przyciskiem myszy rozwiązanie SRQuickStart, wskaż polecenie Dodaj i wybierz pozycję Nowy projekt.
  2. Wybierz pozycję Aplikacja konsolowa, a następnie wybierz pozycję Dalej.
  3. Wprowadź wartość OrderConsumer jako nazwę projektu, a następnie wybierz pozycję Utwórz.
  4. W oknie Eksplorator rozwiązań kliknij prawym przyciskiem myszy pozycję OrderConsumer i wybierz polecenie Ustaw jako projekt startowy.

Dodawanie pakietu NuGet usługi Event Hubs

  1. Wybierz pozycję Narzędzia>Konsola menedżera pakietówNuGet Package Manager> w menu.

  2. W oknie Konsola menedżera pakietów upewnij się, że dla projektu Domyślne wybrano pozycję OrderConsumer. Jeśli nie, użyj listy rozwijanej, aby wybrać pozycję OrderConsumer.

  3. Uruchom następujące polecenie, aby zainstalować wymagane pakiety NuGet. Naciśnij klawisz ENTER , aby uruchomić ostatnie polecenie.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. Uwierzytelnianie aplikacji producentów w celu nawiązania połączenia z platformą Azure za pośrednictwem programu Visual Studio, jak pokazano tutaj.

  5. Zaloguj się do platformy Azure przy użyciu konta użytkownika, które jest członkiem Schema Registry Reader roli na poziomie przestrzeni nazw. Aby uzyskać informacje o rolach rejestru schematów, zobacz Rejestr schematów platformy Azure w usłudze Event Hubs.

  6. Order.cs Dodaj plik wygenerowany w ramach tworzenia aplikacji producenta do projektu OrderConsumer.

  7. Kliknij prawym przyciskiem myszy projekt OrderConsumer i wybierz polecenie Ustaw jako projekt startowy.

Pisanie kodu w celu odbierania zdarzeń i deserializacji ich przy użyciu rejestru schematów

  1. Dodaj następujący kod do pliku Program.cs. Zobacz komentarze kodu, aby uzyskać szczegółowe informacje. Ogólne kroki w kodzie są następujące:

    1. Utwórz klienta odbiorcy, którego można użyć do wysyłania zdarzeń do centrum zdarzeń.
    2. Utwórz klienta kontenera obiektów blob dla kontenera obiektów blob w usłudze Azure Blob Storage.
    3. Utwórz klienta procesora zdarzeń i zarejestruj procedury obsługi zdarzeń i błędów.
    4. W procedurze obsługi zdarzeń utwórz klienta rejestru schematów, którego można użyć do deserializacji danych zdarzeń do Order obiektu.
    5. Deserializowanie danych zdarzenia do Order obiektu przy użyciu serializatora.
    6. Wydrukuj informacje o odebranych zamówieniach.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be userd as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. Zastąp następujące wartości zastępcze rzeczywistymi wartościami.

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING — parametry połączenia dla przestrzeni nazw usługi Event Hubs
    • EVENTHUBNAME — nazwa centrum zdarzeń
    • EVENTHUBSNAMESPACENAME — nazwa przestrzeni nazw usługi Event Hubs
    • SCHEMAGROUPNAME - nazwa grupy schematów
    • AZURESTORAGECONNECTIONSTRING — parametry połączenia dla konta usługi Azure Storage
    • BLOBCONTAINERNAME - Nazwa kontenera obiektów blob
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. Skompiluj projekt i upewnij się, że nie ma żadnych błędów.

  4. Uruchom aplikację odbiorcy.

  5. Powinien zostać wyświetlony komunikat informujący o odebraniu zdarzeń.

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    Te zdarzenia to trzy zdarzenia wysłane wcześniej do centrum zdarzeń, uruchamiając program nadawcy.

Przykłady

Zobacz artykuł Readme w naszym repozytorium GitHub.

Czyszczenie zasobów

Usuń przestrzeń nazw usługi Event Hubs lub usuń grupę zasobów zawierającą przestrzeń nazw.

Następne kroki