将 AKS 上的 HDInsight 上的 Apache Flink® 与适用于 Apache Kafka® 的 Azure 事件中心连接
重要
AKS 上的 Azure HDInsight 已于 2025 年 1 月 31 日停用。 了解此公告的详细信息。
需要将工作负荷迁移到 Microsoft Fabric 或等效的 Azure 产品,以避免工作负荷突然终止。
重要
此功能目前以预览版提供。 Microsoft Azure 预览版补充使用条款 包括适用于处于测试版、预览版或尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览版的信息,请参阅 Azure HDInsight 在 AKS 上的预览信息 。 有关问题或功能建议,请在 AskHDInsight 上提交请求,并提供详细信息。关注我们,以获取更多关于 Azure HDInsight 社区 的更新。
Apache Flink 的已知用例是流分析。 许多用户最常选择使用通过 Apache Kafka 引入的数据流。 典型的 Flink 和 Kafka 安装是从将事件流推送到 Kafka 开始,Flink 作业可以消费这些流。 Azure 事件中心在事件中心上提供 Apache Kafka 终结点,使用户能够使用 Kafka 协议连接到事件中心。
本文介绍如何将 Azure 事件中心 与 AKS 上的 hdInsight 上的 Apache Flink 连接,并介绍以下内容
- 创建事件中心命名空间
- 使用 Apache Flink 在 AKS 群集上创建 HDInsight
- 运行 Flink 生成者
- Apache Flink 的 Jar 包
- 作业提交 & 验证
创建事件中心命名空间和事件中心
在 AKS 上的 HDInsight 上设置 Flink 群集
在 AKS 群集池上使用现有的 HDInsight,可以创建 Flink 群集
运行 Flink 生成者,添加 bootstrap.servers 和
producer.config
信息bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
将
{YOUR.EVENTHUBS.CONNECTION.STRING}
替换为事件中心命名空间的连接字符串。 有关获取连接字符串的说明,请参阅有关如何 获取事件中心连接字符串的详细信息。例如
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
打包适用于 Flink 的 JAR
Package com.example.app;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
添加代码片段以运行 Flink 生成者。
执行代码后,事件将存储在主题 “topic1”
参考
- Apache Flink 网站
- Apache、Apache Kafka、Kafka、Apache Flink、Flink 和关联的开源项目名称 Apache Software Foundation(ASF) 商标。