你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

快速入门:使用 Azure 事件中心 和 Apache Kafka 流式传输数据

本快速入门介绍如何使用 Apache Kafka 协议将数据流式传输到 Azure 事件中心和从中流式传输出数据。 你不会更改示例 Kafka 生成者或使用者应用中的任何代码。 只是更新客户端用于指向事件中心命名空间的配置,该命名空间会公开 Kafka 终结点。 你也不会自行构建和使用 Kafka 群集。 相反,你会搭配使用事件中心命名空间与 Kafka 终结点。

注意

GitHub 上提供了此示例

先决条件

若要完成本快速入门,请确保符合以下先决条件:

创建一个 Azure 事件中心命名空间

当你创建事件中心命名空间时,系统会自动为该命名空间启用 Kafka 终结点。 可以从使用 Kafka 协议的应用程序,将事件流式传输到事件中心。 按照使用 Azure 门户创建事件中心中的分步说明创建事件中心命名空间。 如果使用专用群集,请参阅在专用群集中创建命名空间和事件中心

注意

基本层不支持适用于 Kafka 的事件中心。

在事件中心内使用 Kafka 发送和接收消息

  1. 为虚拟机启用系统分配的托管标识。 若要详细了解如何在 VM 上配置托管标识,请参阅使用 Azure 门户在 VM 上配置 Azure 资源托管标识。 Azure 资源的托管标识在 Microsoft Entra ID 中为 Azure 服务提供了一个自动托管标识。 可以使用此标识向支持 Microsoft Entra 身份验证的任何服务进行身份验证,这样就无需在代码中插入凭据了。

    Azure 门户中虚拟机页的“标识”选项卡的屏幕截图。

  2. 使用创建的事件中心命名空间的“访问控制”页,将 Azure 事件中心数据所有者角色分配给 VM 的托管标识。 Azure 事件中心支持使用 Microsoft Entra ID 对事件中心资源请求进行授权。 借助 Microsoft Entra ID,可使用 Azure 基于角色的访问控制 (Azure RBAC) 授予对安全主体的访问权限,该安全主体可能是用户或应用程序服务主体。

    1. 在 Azure 门户中,导航到你的事件中心命名空间。 转到左侧导航栏中的“访问控制(IAM)”。

    2. 选择“+ 添加”,然后选择“Add role assignment”。

      事件中心命名空间的“访问控制”页的屏幕截图。

    3. 在“角色”选项卡中,选择“Azure 事件中心数据所有者”,然后选择“下一步”按钮。

      屏幕截图显示选择了“Azure 事件中心数据所有者”角色。

    4. 在“成员”选项卡中的“将访问权限分配给”部分选择“托管标识”。

    5. 选择“+选择成员”链接。

    6. 在“选择托管标识”页上,执行以下步骤:

      1. 选择包含 VM 的 Azure 订阅。

      2. 对于“托管标识”,请选择“虚拟机”

      3. 选择虚拟机的托管标识。

      4. 选择页面底部的“选择”。

        屏幕截图显示“添加角色分配 -> 选择托管标识”页。

    7. 选择“查看 + 分配”。

      屏幕截图显示“添加角色分配”页,其中包含分配给 VM 的托管标识的角色。

  3. 重启 VM 并重新登录到为其配置了托管标识的 VM。

  4. 克隆用于 Kafka 的 Azure 事件中心存储库

  5. 导航到 azure-event-hubs-for-kafka/tutorials/oauth/java/managedidentity/consumer

  6. 切换到 src/main/resources/ 文件夹,打开 consumer.config。 将 namespacename 替换为事件中心命名空间的名称。

    bootstrap.servers=NAMESPACENAME.servicebus.windows.net:9093
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    sasl.login.callback.handler.class=CustomAuthenticateCallbackHandler;
    

    注意

    可在此处找到 Kafka 的事件中心的所有 OAuth 示例。

  7. 切换回 pom.xml 文件所在的 Consumer 文件夹,使用 Kafka 客户端运行使用者代码并处理来自事件中心的事件:

    mvn clean package
    mvn exec:java -Dexec.mainClass="TestConsumer"                                    
    
  8. 启动另一个命令提示符窗口,并导航到 azure-event-hubs-for-kafka/tutorials/oauth/java/managedidentity/producer

  9. 切换到 src/main/resources/ 文件夹,打开 producer.config。 将 mynamespace 替换为事件中心命名空间的名称。

  10. 切换回 pom.xml 文件所在的 Producer 文件夹,运行生成者代码并将事件流式传输到事件中心:

    mvn clean package
    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    

    应会看到有关在生成者窗口中发送的事件的消息。 现在请检查使用者应用窗口,了解它从事件中心接收的消息。

    屏幕截图显示生成者和使用者应用窗口,其中显示了事件。

使用架构注册表对 Kafka 进行架构验证

使用事件中心通过 Kafka 应用程序流式传输数据时,可使用 Azure 架构注册表来执行架构验证。 事件中心的 Azure 架构注册表提供用于管理架构的集中式存储库,你可以将新的或现有的 Kafka 应用程序与架构注册表无缝连接。

若要了解详细信息,请参阅使用 Avro 验证 Apache Kafka 应用程序的架构

后续步骤

本文介绍了如何在不更改协议客户端或运行自己的群集的情况下,将事件流式传输到事件中心。 若要了解详细信息,请参阅针对 Azure 事件中心的 Apache Kafka 开发人员指南