将 Apache Kafka® on HDInsight 与 AKS 上的 HDInsight 上的 Apache Flink® 配合使用

重要

AKS 上的 Azure HDInsight 已于 2025 年 1 月 31 日停用。 了解更多信息,请参阅此公告

需要将工作负荷迁移到 Microsoft Fabric 或等效的 Azure 产品,以避免工作负荷突然终止。

重要

此功能目前以预览版提供。 Microsoft Azure 预览版补充使用条款 包括适用于测试版、预览版或尚未投入一般可用性的 Azure 功能的更多法律条款。 有关此特定预览版的信息,请参阅 AKS 上的 Azure HDInsight 预览信息。 有关问题或功能建议,请在 AskHDInsight 上提交请求,提供详细信息,并关注我们以获取有关 Azure HDInsight 社区 的更多更新。

Apache Flink 的已知用例是流分析。 使用 Apache Kafka 引入的数据流成为许多用户的热门选择。 Flink 和 Kafka 的典型安装从推送到 Kafka 的事件流开始,Flink 作业可以使用该流。

此示例在运行 Flink 1.17.0 的 AKS 群集上使用 HDInsight 来处理使用和生成 Kafka 主题的流数据。

注意

FlinkKafkaConsumer 已弃用,并将随 Flink 1.17 一起删除,请改用 KafkaSource。 FlinkKafkaProducer 已弃用,并将随 Flink 1.15 一起删除,请改用 KafkaSink。

先决条件

  • Kafka 和 Flink 都需要位于同一 VNet 中,或者两个群集之间应存在 vnet 对等互连。

  • 创建 VNet

  • 在同一 VNet中创建 Kafka 群集。 可以根据当前使用情况在 HDInsight 上选择 Kafka 3.2 或 2.4。

    显示如何在同一 VNet 中创建 Kafka 群集的屏幕截图。

  • 在虚拟网络部分添加 VNet 详细信息。

  • 在具有相同 VNet 的 AKS 集群池上创建 HDInsight。

  • 在创建的群集池中创建一个 Flink 群集。

Apache Kafka 连接器

Flink 提供了 Apache Kafka 连接器,用于从 Kafka 主题读取和写入数据,并保证精确一次语义。

Maven 依赖项

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

创建 Kafka 数据流接收端

Kafka 接收器提供用于构建 KafkaSink 实例的构建器类。 我们使用相同的构造汇聚器,并在 AKS 上运行的 HDInsight 的 Flink 集群中使用它。

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

编写 Java 程序 Event.java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

在 Webssh 上,上传 jar 并提交 jar

显示 Flink 上运行的作业的屏幕截图。

在 Flink 控制台界面上

屏幕截图显示如何将 Kafka 主题打包的 jar 文件作为作业提交到 Flink。

生成主题 - 单击 Kafka

显示如何生成 Kafka 主题的屏幕截图。

消费主题 - Kafka 上的事件

显示如何使用 Kafka 主题的屏幕截图。

参考文献