将 AKS 上的 HDInsight 上的 Apache Flink® 与适用于 Apache Kafka® 的 Azure 事件中心连接

重要

AKS 上的 Azure HDInsight 已于 2025 年 1 月 31 日停用。 了解此公告的详细信息

需要将工作负荷迁移到 Microsoft Fabric 或等效的 Azure 产品,以避免工作负荷突然终止。

重要

此功能目前以预览版提供。 Microsoft Azure 预览版补充使用条款 包括适用于处于测试版、预览版或尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览版的信息,请参阅 Azure HDInsight 在 AKS 上的预览信息 。 有关问题或功能建议,请在 AskHDInsight 上提交请求,并提供详细信息。关注我们,以获取更多关于 Azure HDInsight 社区 的更新。

Apache Flink 的已知用例是流分析。 许多用户最常选择使用通过 Apache Kafka 引入的数据流。 典型的 Flink 和 Kafka 安装是从将事件流推送到 Kafka 开始,Flink 作业可以消费这些流。 Azure 事件中心在事件中心上提供 Apache Kafka 终结点,使用户能够使用 Kafka 协议连接到事件中心。

本文介绍如何将 Azure 事件中心 与 AKS 上的 hdInsight 上的 Apache Flink 连接,并介绍以下内容

  • 创建事件中心命名空间
  • 使用 Apache Flink 在 AKS 群集上创建 HDInsight
  • 运行 Flink 生成者
  • Apache Flink 的 Jar 包
  • 作业提交 & 验证

创建事件中心命名空间和事件中心

  1. 若要创建事件中心命名空间和事件中心,请参阅此处

    显示事件中心设置的 屏幕截图。

  1. 在 AKS 群集池上使用现有的 HDInsight,可以创建 Flink 群集

  2. 运行 Flink 生成者,添加 bootstrap.serversproducer.config 信息

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. {YOUR.EVENTHUBS.CONNECTION.STRING} 替换为事件中心命名空间的连接字符串。 有关获取连接字符串的说明,请参阅有关如何 获取事件中心连接字符串的详细信息。

    例如

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Package com.example.app;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. 添加代码片段以运行 Flink 生成者。

    显示如何在事件中心测试 Flink 的屏幕截图。

  3. 执行代码后,事件将存储在主题 “topic1”

    显示主题中存储的事件中心的屏幕截图。

参考