Использование схемы JSON с приложениями Apache Kafka
В этом руководстве описан сценарий, в котором схемы JSON используются для сериализации и десериализации событий с помощью реестра схем Azure в Центрах событий.
В этом случае приложение производителя Kafka использует схему JSON, хранящуюся в реестре схем Azure, для сериализации события и публикации их в концентраторе событий Kafka в Центры событий Azure. Потребитель Kafka десериализирует события, которые он потребляет из Центров событий. Для этого используется идентификатор схемы события и схемы JSON, которая хранится в реестре схем Azure.
Необходимые компоненты
Если вы впервые используете Центры событий Azure, ознакомьтесь с общими сведениями, прежде чем приступить к работе с этим руководством.
Для работы с данным руководством необходимо следующее:
- Если у вас еще нет подписки Azure, создайте бесплатную учетную запись Azure, прежде чем начать работу.
- В среде разработки установите следующие компоненты:
- Комплект разработчика Java (JDK) 1.7+.
- Скачайте и установите двоичный архив Maven.
- Git
- Клонируйте репозиторий Схемы Azure для Kafka .
Создание концентратора событий
Следуйте инструкциям из краткого руководства. Создание пространства имен Центров событий и концентратора событий для создания пространства имен Центров событий и концентратора событий. Затем следуйте инструкциям из получения строка подключения, чтобы получить строка подключения в пространство имен Центров событий.
Запишите следующие параметры, которые вы используете в текущем кратком руководстве.
- Строка подключения пространства имен Центров событий
- Имя концентратора событий
Создание схемы
Следуйте инструкциям из руководства по созданию схем с помощью реестра схем, чтобы создать группу схем и схему.
Создайте группу схем с именем contoso-sg с помощью портала реестра схем. Используйте схему JSON в качестве типа сериализации.
В этой группе схем создайте новую схему JSON с именем схемы:
Microsoft.Azure.Data.SchemaRegistry.example.CustomerInvoice
используя следующее содержимое схемы.{ "$id": "https://example.com/person.schema.json", "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "CustomerInvoice", "type": "object", "properties": { "invoiceId": { "type": "string" }, "merchantId": { "type": "string" }, "transactionValueUsd": { "type": "integer" }, "userId": { "type": "string" } } }
Регистрация приложения для доступа к реестру схем
Вы можете использовать идентификатор Microsoft Entra для авторизации производителя Kafka и клиентского приложения для доступа к ресурсам реестра схем Azure. Чтобы включить его, необходимо зарегистрировать клиентское приложение в клиенте Microsoft Entra из портал Azure.
Чтобы зарегистрировать приложение Microsoft Entra с именем example-app
, см. статью "Регистрация приложения с помощью клиента Microsoft Entra".
- tenant.id — задает идентификатор клиента приложения.
- client.id — задает идентификатор клиента приложения.
- client.secret — задает секрет клиента для проверки подлинности.
Если вы используете управляемое удостоверение, вам потребуется:
- use.managed.identity.credential — указывает, что учетные данные MSI следует использовать для виртуальной машины с поддержкой MSI.
- managed.identity.clientId — если задано, он создает учетные данные MSI с заданным идентификатором клиента managed.identity.resourceId, если он указан, он создает учетные данные MSI с заданным идентификатором ресурса.
Добавление пользователя в роль читателя реестра схем
Добавьте учетную запись пользователя в роль читателя реестра схем на уровне пространства имен. Вы также можете использовать роль участника реестра схем, но это не обязательно для этого краткого руководства.
- На странице пространства имен Центров событий выберите элемент управления доступом (IAM) в меню слева.
- На странице управления доступом (IAM) выберите +Добавить -Добавить> назначение ролей в меню.
- На странице "Тип назначения" нажмите кнопку "Далее".
- На странице "Роли" выберите средство чтения реестра схем и нажмите кнопку "Далее" в нижней части страницы.
- Используйте ссылку +Выбрать участников , чтобы добавить
example-app
приложение, созданное на предыдущем шаге, в роль, а затем нажмите кнопку "Далее". - На странице "Рецензирование и назначение" выберите "Рецензирование и назначение".
Обновление конфигурации клиентского приложения kafka
Необходимо обновить клиентскую конфигурацию производителей и потребительских приложений Kafka с сведениями о приложении Microsoft Entra и сведениями о реестре схем.
Чтобы обновить конфигурацию производителя Kafka, перейдите к azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.
Обновите конфигурацию приложения Kafka в src/main/resources/app.properties , следуя руководству по краткому руководству по Kafka для Центров событий.
Обновите сведения о конфигурации производителя в src/main/resources/app.properties с помощью связанной конфигурации реестра схем и приложения Microsoft Entra, созданного на предыдущем шаге, следующим образом:
schema.group=contoso-sg schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net tenant.id=<> client.id=<> client.secret=<>
Следуйте тем же инструкциям и обновите конфигурацию azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer .
Для производителей Kafka и потребительских приложений используется следующая схема JSON:
{ "$id": "https://example.com/person.schema.json", "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "CustomerInvoice", "type": "object", "properties": { "invoiceId": { "type": "string" }, "merchantId": { "type": "string" }, "transactionValueUsd": { "type": "integer" }, "userId": { "type": "string" } } }
Использование производителя Kafka с проверкой схемы JSON
Чтобы запустить приложение производителя Kafka, перейдите к azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.
Вы можете запустить приложение-производитель, чтобы создать определенные записи схемы JSON или универсальные записи. Для конкретного режима записей необходимо сначала создать классы для любой схемы производителя с помощью следующей команды maven:
mvn generate-sources
Затем можно запустить приложение производителя с помощью следующих команд.
mvn clean package mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
После успешного выполнения приложения производителя он предложит выбрать сценарий производителя. В этом кратком руководстве можно выбрать вариант 1 — создать определенныеrecords.
Enter case number: 1 - produce SpecificRecords
После успешной сериализации и публикации данных в приложении производителя должны появиться следующие журналы консоли:
INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 0 INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 1 INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 2
Использование потребителя Kafka с проверкой схемы JSON
Чтобы запустить приложение потребителя Kafka, перейдите к azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer.
Вы можете запустить приложение-получатель, чтобы оно потребляло определенные записи или универсальные записи схемы JSON. Для конкретного режима записей необходимо сначала создать классы для любой схемы производителя с помощью следующей команды maven:
mvn generate-sources
Затем можно запустить приложение-получатель с помощью следующей команды.
mvn clean package mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
После успешного выполнения приложения-получателя он предложит выбрать сценарий производителя. В этом кратком руководстве можно выбрать вариант 1— использовать specificRecords.
Enter case number: 1 - consume SpecificRecords
После успешного потребления и десериализации данных в приложении производителя должны появиться следующие журналы консоли:
INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 0, merchantId=Merchant Id 0, transactionValueUsd=0, userId=User Id 0} INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 1, merchantId=Merchant Id 1, transactionValueUsd=1, userId=User Id 1} INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 2, merchantId=Merchant Id 2, transactionValueUsd=2, userId=User Id 2}
Очистка ресурсов
Удалите пространство имен Центров событий или удалите группу ресурсов, содержащую пространство имен.