Поделиться через


Проверка с помощью схемы Avro при потоковой передаче событий с помощью пакетов SDK для .NET (AMQP) Центров событий

Из этого краткого руководства вы узнаете, как отправлять события в концентратор событий и получать их из концентратора событий с помощью проверки схемы с помощью библиотеки Azure.Messaging.EventHubs .NET.

Примечание

Реестр схем Azure — это функция Центров событий, которая предоставляет центральный репозиторий для схем для управляемых событиями и ориентированных на обмен сообщениями приложений. Он обеспечивает гибкость работы приложений-производителей и приложений-получателей при обмене данными, избавляя от необходимости управлять схемой и позволяя совместно использовать ее в приложениях обоих типов. Реестр схем также предоставляет простую платформу управления для повторно используемых схем и определяет связь между схемами через конструкцию группировки (группы схем). Дополнительные сведения см. в статье Реестр схем Azure в Центрах событий.

Предварительные требования

Если вы впервые используете Центры событий Azure, ознакомьтесь с общими сведениями, прежде чем приступить к работе с этим руководством.

Для работы с данным руководством необходимо следующее:

  • Если у вас еще нет подписки Azure, создайте бесплатную учетную запись Azure, прежде чем начать работу.
  • Microsoft Visual Studio 2022. Клиентская библиотека для Центров событий Azure использует новые возможности, реализованные в C# версии 8.0. Вы по-прежнему можете использовать библиотеку с предыдущими версиями языка C#, но новый синтаксис недоступен. Чтобы использовать полный синтаксис, рекомендуется выполнять компиляцию с помощью пакета SDK для .NET Core 3.0 или более поздней версии, а для языковой версии должно быть установлено значение latest. Если вы используете Visual Studio, учитывайте, что версии до Visual Studio 2019 несовместимы со средствами, необходимыми для сборки проектов C# 8.0. Скачать Visual Studio 2019, в том числе бесплатный выпуск Community, можно здесь.

Создание концентратора событий

Следуйте инструкциям из краткого руководства по созданию пространства имен Центров событий и концентратора событий , чтобы создать пространство имен Центров событий и концентратор событий. Затем следуйте инструкциям из раздела Получение строки подключения , чтобы получить строку подключения к пространству имен Центров событий.

Запишите на указанные ниже параметры, которые будут использоваться в этом кратком руководстве.

  • Строка подключения пространства имен Центров событий
  • Имя концентратора событий

Создание схемы

С помощью инструкций из статьи о создании схем с помощью реестра схем создайте группу схем и схему.

  1. Создайте группу схем с именем contoso-sg с помощью портала реестра схем. В качестве типа сериализации укажите Avro, в качестве режима совместимости — None (т. е. без совместимости).

  2. В этой группе схем создайте новую схему Avro с именем Microsoft.Azure.Data.SchemaRegistry.example.Order и указанным ниже содержимым.

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

Добавление пользователя в роль читателя реестра схем

Добавьте учетную запись пользователя в роль читателя реестра схем на уровне пространства имен. Вы также можете использовать роль Участник реестра схем , но это необязательно для работы с этим кратким руководством.

  1. На странице Пространство имен Центров событий выберите Управление доступом (IAM) в меню слева.
  2. На странице Управление доступом (IAM) выберите + Добавить ->Добавить назначение роли в меню.
  3. На странице Тип назначения нажмите кнопку Далее.
  4. На странице Роли выберите Читатель реестра схем (предварительная версия) и нажмите кнопку Далее в нижней части страницы.
  5. Используйте ссылку + Выбрать участников , чтобы добавить учетную запись пользователя в роль, а затем нажмите кнопку Далее.
  6. На странице Проверка и назначение выберите Проверка и назначение.

Создание событий для концентраторов событий с проверкой схемы

Создание консольного приложения для производителя событий

  1. Запустите Visual Studio 2019.
  2. Выберите Создать новый проект.
  3. В диалоговом окне Создать проект выполните следующие действия: Если это диалоговое окно не отображается, щелкните в меню пункт Файл, затем последовательно выберите Новый и Проект.
    1. Выберите язык программирования C#.

    2. Для типа приложения выберите значение Консоль.

    3. В списке результатов выберите Консольное приложение.

    4. Затем нажмите кнопку Далее.

      Изображение диалогового окна

  4. Укажите имя проекта OrderProducer и имя решения SRQuickStart, а затем нажмите ОК, чтобы создать проект.

Добавление пакета NuGet для Центров событий

  1. Выберите в меню элементы Инструменты>Диспетчер пакетов NuGet>Консоль диспетчера пакетов.

  2. Выполните следующие команды, чтобы установить Azure.Messaging.EventHubs и другие пакеты NuGet. Нажмите клавишу ВВОД , чтобы выполнить последнюю команду.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. Настройке проверку подлинности приложений-производителей для подключения к Azure через Visual Studio, как описано здесь.

  4. Войдите в Azure с помощью учетной записи пользователя, которая является членом Schema Registry Reader роли на уровне пространства имен. Сведения о ролях реестра схем см . в разделе Реестр схем Azure в Центрах событий.

Создание кода с помощью схемы Avro

  1. Используйте то же содержимое, которое использовалось при создании схемы, чтобы создать файл с именем Order.avsc. Сохраните файл в папке проекта или решения.
  2. Затем на основе этого файла схемы можно сгенерировать код для .NET. Для создания кода можно использовать любое внешнее средство, например avrogen. Например, можно выполнить команду avrogen -s .\Order.avsc . для создания кода.
  3. После создания кода вы увидите файл с именем Order.cs в папке \Microsoft\Azure\Data\SchemaRegistry\example . Для приведенной выше схемы Avro создаются типы C# в пространстве имен Microsoft.Azure.Data.SchemaRegistry.example.
  4. Order.cs Добавьте файл в OrderProducer проект.

Написание кода для отправки событий в концентратор событий

  1. Добавьте в файл Program.cs указанный ниже код. Дополнительные сведения см. в комментариях к коду. Ниже приведены общие шаги в коде.

    1. Создайте клиент производителя, который можно использовать для отправки событий в концентратор событий.
    2. Создайте клиент реестра схем, который можно использовать для сериализации и проверки данных в объекте Order .
    3. Создайте объект Order с помощью созданного Order типа.
    4. Используйте клиент реестра схем для сериализации объекта в OrderEventData.
    5. Создайте пакет событий.
    6. Добавьте данные события в пакет событий.
    7. Используйте клиент производителя для отправки пакета событий в концентратор событий.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. Замените следующие значения заполнителей реальными значениями.

    • EVENTHUBSNAMESPACECONNECTIONSTRING — строка подключения для пространства имен Центров событий;
    • EVENTHUBNAME — имя концентратора событий;
    • EVENTHUBSNAMESPACENAME — имя пространства имен Центров событий;
    • SCHEMAGROUPNAME — имя группы схем;
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. Выполните сборку проекта и убедитесь, что она прошла без ошибок.

  4. Выполните программу и дождитесь подтверждающего сообщения.

    A batch of 1 order has been published.
    
  5. Получение событий концентратором событий можно проверить на портале Azure. Переключитесь на представление Сообщения в разделе Метрики. Обновите страницу, чтобы обновить диаграмму. На отображение полученных сообщений может уйти несколько секунд.

    Изображение страницы портала Azure, на которой можно проверить, получил ли события концентратор событий.

Использование событий из концентраторов событий с проверкой схемы

В этом разделе показано, как написать консольное приложение .NET Core, которое получает события из концентратора событий, и использовать реестр схем для десериализации данных событий.

Дополнительные требования

  • Создайте учетную запись хранения для использования обработчика событий.

Создание приложения-получателя

  1. В окне обозревателя решений щелкните правой кнопкой мыши решение SRQuickStart, выберите элемент Добавить и действие Создать проект.
  2. Выберите Консольное приложение и нажмите Далее.
  3. Введите OrderConsumer в поле Название проекта и выберите Создать.
  4. В окне Обозреватель решений щелкните правой кнопкой мыши OrderConsumer и выберите Установить как запускаемый проект.

Добавление пакета NuGet для Центров событий

  1. Выберите в меню элементы Инструменты>Диспетчер пакетов NuGet>Консоль диспетчера пакетов.

  2. В окне Консоли диспетчера пакетов убедитесь, что в качестве проекта по умолчанию выбран OrderConsumer. В противном случае выберите OrderConsumer из раскрывающегося списка.

  3. Выполните следующую команду, чтобы установить необходимые пакеты NuGet. Нажмите клавишу ВВОД , чтобы выполнить последнюю команду.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. Настройке проверку подлинности приложений-производителей для подключения к Azure через Visual Studio, как описано здесь.

  5. Войдите в Azure с помощью учетной записи пользователя, которая является членом Schema Registry Reader роли на уровне пространства имен. Сведения о ролях реестра схем см . в разделе Реестр схем Azure в Центрах событий.

  6. Добавьте файл, Order.cs созданный при создании приложения-производителя, в проект OrderConsumer .

  7. Щелкните правой кнопкой мыши проект OrderConsumer и выберите Назначить запускаемым проектом.

Написание кода для получения событий и их десериализации с помощью реестра схем

  1. Добавьте в файл Program.cs указанный ниже код. Дополнительные сведения см. в комментариях к коду. Ниже приведены общие шаги в коде.

    1. Создайте клиент потребителя, который можно использовать для отправки событий в концентратор событий.
    2. Создайте клиент контейнера BLOB-объектов для контейнера BLOB-объектов в хранилище BLOB-объектов Azure.
    3. Создайте клиент обработчика событий и зарегистрируйте обработчики событий и ошибок.
    4. В обработчике событий создайте клиент реестра схем, который можно использовать для десериализации данных событий в Order объект .
    5. Десериализация данных события в Order объект с помощью сериализатора.
    6. Вывод сведений о полученном заказе.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be userd as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. Замените следующие значения заполнителей реальными значениями.

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING — строка подключения для пространства имен Центров событий;
    • EVENTHUBNAME — имя концентратора событий;
    • EVENTHUBSNAMESPACENAME — имя пространства имен Центров событий;
    • SCHEMAGROUPNAME — имя группы схем;
    • AZURESTORAGECONNECTIONSTRING — строка подключения для учетной записи хранения Azure.
    • BLOBCONTAINERNAME — Имя контейнера BLOB-объектов
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. Выполните сборку проекта и убедитесь, что она прошла без ошибок.

  4. Запустите приложение получателя.

  5. Отобразится сообщение о том, что события успешно получены.

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    Здесь отображаются те же три события, которые вы ранее отправили в концентратор событий, выполняя программу отправителя.

Примеры

См. статью Readme в нашем репозитории GitHub.

Очистка ресурсов

Удалите пространство имен Центров событий или группу ресурсов, содержащую это пространство имен.

Дальнейшие действия