使用 Akka Streams 搭配適用於 Apache Kafka 的事件中樞
本教學課程說明如何透過 Apache Kafka 的事件中樞支援連線至 Akka Streams,而不需要變更通訊協定用戶端或執行您自己的叢集。
在本教學課程中,您會了解如何:
- 建立事件中樞命名空間
- 複製範例專案
- 執行 Akka Streams 生產者
- 執行 Akka Streams 取用者
注意
您可在 GitHub 上取得此範例
必要條件
若要完成本教學課程,請確定您具有下列必要條件:
- 請參閱適用於 Apache Kafka 的事件中樞一文。
- Azure 訂用帳戶。 如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶。
- Java Development Kit (JDK) 1.8+ \(英文\)
- 在 Ubuntu 上,執行
apt-get install default-jdk
來安裝 JDK。 - 務必設定 JAVA_HOME 環境變數,以指向 JDK 安裝所在的資料夾。
- 在 Ubuntu 上,執行
- 下載並安裝 Maven 二進位封存檔
- 在 Ubuntu 上,您可以執行
apt-get install maven
來安裝 Maven。
- 在 Ubuntu 上,您可以執行
- Git
- 在 Ubuntu 上,您可以執行
sudo apt-get install git
來安裝 Git。
- 在 Ubuntu 上,您可以執行
建立事件中樞命名空間
您需要事件中樞命名空間,才能從任何事件中樞服務傳送或接收。 如需詳細資訊,請參閱建立事件中樞。 請務必複製事件中樞連接字串以供稍後使用。
複製範例專案
既然您已經有事件中樞連接字串,請複製適用於 Kafka 的 Azure 事件中樞存放庫,並瀏覽至 akka
子資料夾:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/akka/java
執行 Akka Streams 生產者
使用提供的 Akka Streams 生產者範例,傳送訊息到事件中樞服務。
提供事件中樞 Kafka 端點
生產者 application.conf
更新 producer/src/main/resources/application.conf
中 bootstrap.servers
和 sasl.jaas.config
的值,以使用正確的驗證將生產者導向至事件中樞 Kafka 端點。
akka.kafka.producer {
#Akka Kafka producer properties can be defined here
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}
重要
將 {YOUR.EVENTHUBS.CONNECTION.STRING}
取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱取得事件中樞連接字串。 以下是範例組態:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
從命令列執行生產者
若要從命令行執行產生者,請產生 JAR,然後從 Maven 內執行 (或使用 Maven 產生 JAR,然後在 Java 中執行,方法是將必要的 Kafka Java 封存盤案 (JAR) 新增至 classpath):
mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestProducer"
生產者會開始將事件傳送到事件中樞 (位於主題 test
),並將事件印出至 stdout。
執行 Akka Streams 取用者
使用提供的取用者範例,接收來自事件中樞的訊息。
提供事件中樞 Kafka 端點
取用者 application.conf
更新 consumer/src/main/resources/application.conf
中 bootstrap.servers
和 sasl.jaas.config
的值,以使用正確的驗證將取用者導向至事件中樞 Kafka 端點。
akka.kafka.consumer {
#Akka Kafka consumer properties defined here
wakeup-timeout=60s
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# defined in this configuration section.
kafka-clients {
request.timeout.ms=60000
group.id=akka-example-consumer
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}
重要
將 {YOUR.EVENTHUBS.CONNECTION.STRING}
取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱取得事件中樞連接字串。 以下是範例組態:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
從命令列執行取用者
若要從命令行執行取用者,請產生 JAR,然後從 Maven 內執行 (或使用 Maven 產生 JAR,然後在 Java 中執行,方法是將必要的 Kafka JAR 新增至 classpath):
mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestConsumer"
如果事件中樞有事件 (例如您的生產者也正在執行),則取用者會開始接收來自主題 test
的事件。
如需有關 Akka Streams 的詳細資訊,請參閱 Akka Streams Kafka 指南 \(英文\)。
下一步
若要深入了解適用於 Kafka 的事件中樞,請參閱下列文章: