你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

将 Akka Streams 与适用于 Apache Kafka 的事件中心配合使用

本教程介绍如何在不更改协议客户端或运行你自己的群集的情况下通过 Apache Kafka 的事件中心支持来连接 Akka 流。

在本教程中,你将了解如何执行以下操作:

  • 创建事件中心命名空间
  • 克隆示例项目
  • 运行 Akka Streams 生产者
  • 运行 Akka Streams 使用者

注意

GitHub 上提供了此示例

先决条件

若要完成本教程,请确保具备以下先决条件:

  • 通读用于 Apache Kafka 的事件中心一文。
  • Azure 订阅。 如果没有订阅,请在开始之前创建一个免费帐户
  • Java 开发工具包 (JDK) 1.8+
    • 在 Ubuntu 上运行 apt-get install default-jdk,以便安装 JDK。
    • 请确保设置 JAVA_HOME 环境变量,使之指向在其中安装了 JDK 的文件夹。
  • 下载安装 Maven 二进制存档
    • 在 Ubuntu 上,可以通过运行 apt-get install maven 来安装 Maven。
  • Git
    • 在 Ubuntu 上,可以通过运行 sudo apt-get install git 来安装 Git。

创建事件中心命名空间

要从任何事件中心服务进行发送或接收,需要事件中心命名空间。 有关详细信息,请参阅创建事件中心。 请确保复制事件中心连接字符串,以供将来使用。

克隆示例项目

获得事件中心连接字符串后,即可克隆适用于 Kafka 的 Azure 事件中心存储库并导航到 akka 子文件夹:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/akka/java

运行 Akka Streams 生产者

使用所提供的 Akka Streams 生产者事例,将消息发送到事件中心服务。

提供事件中心 Kafka 终结点

生产者 application.conf

更新 producer/src/main/resources/application.conf 中的 bootstrap.serverssasl.jaas.config 值,以使用正确的身份验证将生产者定向到事件中心 Kafka 终结点。

akka.kafka.producer {
    #Akka Kafka producer properties can be defined here


    # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
    # can be defined in this configuration section.
    kafka-clients {
        bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
        sasl.mechanism=PLAIN
        security.protocol=SASL_SSL
        sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
    }
}

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} 替换为事件中心命名空间的连接字符串。 有关获取连接字符串的说明,请参阅获取事件中心连接字符串。 下面是一个配置示例:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

通过命令行运行生产者

若要通过命令行运行生产者,生成 JAR,然后从 Maven 中运行(或使用 Maven 生成 JAR,然后通过向 classpath 添加必要的 Kafka Java 存档文件 [JAR] 在 Java 中运行):

mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestProducer"

生产者将开始向主题 test 中的事件中心发送事件,并将事件输出到 stdout。

运行 Akka Streams 使用者

使用所提供的使用者示例,接收来自事件中心的消息。

提供事件中心 Kafka 终结点

使用者 application.conf

更新 consumer/src/main/resources/application.conf 中的 bootstrap.serverssasl.jaas.config 值,以使用正确的身份验证将使用者定向到事件中心 Kafka 终结点。

akka.kafka.consumer {
    #Akka Kafka consumer properties defined here
    wakeup-timeout=60s

    # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
    # defined in this configuration section.
    kafka-clients {
       request.timeout.ms=60000
       group.id=akka-example-consumer

       bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
       sasl.mechanism=PLAIN
       security.protocol=SASL_SSL
       sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
    }
}

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} 替换为事件中心命名空间的连接字符串。 有关获取连接字符串的说明,请参阅获取事件中心连接字符串。 下面是一个配置示例:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

通过命令行运行使用者

若要通过命令行运行使用者,生成 JAR,然后从 Maven 中运行(或使用 Maven 生成 JAR,然后通过向 classpath 添加必要的 Kafka JAR 在 Java 中运行):

mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestConsumer"

如果事件中心具有事件(例如,如果生产者也在运行),则使用者将开始接收来自主题 test 的事件。

请查看 Akka Streams Kafka 指南,了解有关 Akka Streams 的更多详细信息。

后续步骤

若要详细了解适用于 Kafka 的事件中心,请参阅以下文章: