你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用 Avro (Java) 验证 Apache Kafka 应用程序的架构

在本快速入门指南中,我们将探讨如何使用事件中心的 Azure 架构注册表验证 Apache Kafka 应用程序中的事件。

在此用例中,Kafka 生成者应用程序会使用存储在 Azure 架构注册表中的 Avro 架构来序列化事件并将其发布到 Azure 事件中心中的 Kafka 主题/事件中心。 Kafka 使用者可从事件中心反序列化它使用的事件。 为此,它将使用事件的架构 ID 和存储在 Azure 架构注册表中的 Avro 架构。

Diagram showing schema serialization/de-serialization for Kafka applications using Avro schema.

先决条件

如果不熟悉 Azure 事件中心,请在阅读本快速入门之前参阅事件中心概述

若要完成本快速入门,需要具备以下先决条件:

创建事件中心

按照快速入门创建事件中心命名空间和事件中心中的说明创建事件中心命名空间和事件中心。 然后,按照获取连接字符串中的说明获取事件中心命名空间的连接字符串。

记下你在当前快速入门中所使用的以下设置:

  • 事件中心命名空间的连接字符串
  • 事件中心的名称

创建架构

按照使用架构注册表创建架构中的说明创建架构组和架构。

  1. 使用架构注册表门户创建名为 contoso-sg 的架构组。 使用“Avro”作为序列化类型,使用“无”作为兼容性模式。

  2. 在该架构组中,使用以下架构内容创建名为 Microsoft.Azure.Data.SchemaRegistry.example.Order 的新 Avro 架构。

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

注册应用程序以访问架构注册表

可以使用 Microsoft Entra ID 授权 Kafka 生成者和使用者应用程序访问 Azure 架构注册表资源,其方法是从 Azure 门户向 Microsoft Entra 租户注册客户端应用程序。

要注册名为 example-app 的 Microsoft Entra 应用程序,请参阅向 Microsoft Entra 租户注册应用程序

  • tenant.id - 设置应用程序的租户 ID
  • client.id - 设置应用程序的客户端 ID
  • client.secret - 设置用于身份验证的客户端密码

如果你在使用托管标识,则需要:

  • use.managed.identity.credential - 指示应使用 MSI 凭据,应将其用于已启用 MSI 的 VM
  • managed.identity.clientId - 如果指定,它将使用给定的客户端 ID 生成 MSI 凭据
  • managed.identity.resourceId - 如果指定,它将使用给定的资源 ID 生成 MSI 凭据

将用户添加到架构注册表读取者角色

在命名空间级别,将用户帐户添加到“架构注册表读取者”角色。 也可以使用“架构注册表参与者”角色,但本快速入门不需要这样做。

  1. 在“事件中心命名空间”页面上,从左侧菜单中选择“访问控制(IAM)”。
  2. 在“访问控制(IAM)”页面上,依次选择“+ 添加”-> 菜单上的“添加角色分配”。
  3. 在“分配类型”页面上,选择“下一步”。
  4. 在“角色”页面上,选择“架构注册表读取者(预览)”,然后选择页面底部的“下一步”。
  5. 使用“+ 选择成员”链接将上一步中创建的 example-app 应用程序添加到角色,然后选择“下一步”。
  6. 在“查看 + 分配”页面上,选择“查看 + 分配”。

更新 Kafka 应用程序的客户端应用程序配置

需要使用与所创建的 Microsoft Entra 应用程序相关的配置和架构注册表信息更新 Kafka 生成者和使用者应用程序的客户端配置。

要更新 Kafka 生成者配置,可导航到 azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-producer

  1. 按照事件中心 Kafka 快速入门指南,在 src/main/resources/app.properties 中更新 Kafka 应用程序的配置。

  2. 使用上面创建的架构注册表相关配置和 Microsoft Entra 应用程序更新 src/main/resources/app.properties 中生成者的配置详细信息,如下所示:

    schema.group=contoso-sg
    schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net
    
     tenant.id=<>
     client.id=<>
     client.secret=<>
    
  3. 按照相同的说明操作,并更新 azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-consumer 配置。

  4. 对于 Kafka 生成者和使用者应用程序,可使用以下 Avro 架构:

    {
      "namespace": "com.azure.schemaregistry.samples",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    }
    

将 Kafka 生成者与 Avro 架构验证配合使用

要运行 Kafka 生成者应用程序,可导航到 azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-producer

  1. 可以运行生成者应用程序,以便它可以生成特定于 Avro 的记录或通用记录。 对于特定记录模式,需要首先使用以下 maven 命令针对生成者架构生成类:

    mvn generate-sources
    
  2. 然后,可以使用以下命令运行生成者应用程序。

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
    
  3. 成功执行生成者应用程序后,它会提示你选择生成者方案。 对于本快速入门,可以选择选项“1 - 生成 Avro SpecificRecords”。

    Enter case number:
    1 - produce Avro SpecificRecords
    2 - produce Avro GenericRecords
    
  4. 成功进行数据序列化和发布后,生成者应用程序中应会显示以下控制台日志:

    INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-0", "amount": 10.0, "description": "Sample order 0"}
    INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-1", "amount": 11.0, "description": "Sample order 1"}
    INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-2", "amount": 12.0, "description": "Sample order 2"}
    

将 Kafka 使用者与 Avro 架构验证配合使用

要运行 Kafka 使用者应用程序,可导航到 azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-consumer

  1. 可以运行使用者应用程序,以便它可以使用特定于 Avro 的记录或通用记录。 对于特定记录模式,需要首先使用以下 maven 命令针对生成者架构生成类:

    mvn generate-sources
    
  2. 然后,可以使用以下命令运行使用者应用程序。

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
    
  3. 成功执行使用者应用程序后,它会提示你选择生成者方案。 对于本快速入门,可以选择选项“1 - 使用 Avro SpecificRecords”。

    Enter case number:
    1 - consume Avro SpecificRecords
    2 - consume Avro GenericRecords
    
  4. 成功使用和反序列化数据后,生成者应用程序中应会显示以下控制台日志:

    INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-0", "amount": 10.0, "description": "Sample order 0"}
    INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-1", "amount": 11.0, "description": "Sample order 1"}
    INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-2", "amount": 12.0, "description": "Sample order 2"}
    

清理资源

删除事件中心命名空间或删除包含该命名空间的资源组。