在 AKS 上的 HDInsight 上的 Apache Flink® 上创建 Apache Kafka® 表

重要

AKS 上的 Azure HDInsight 已于 2025 年 1 月 31 日停用。 了解更多信息,请查看此公告

需要将工作负荷迁移到 Microsoft Fabric 或等效的 Azure 产品,以避免工作负荷突然终止。

重要

此功能目前以预览版提供。 Microsoft Azure 预览版补充使用条款 包含适用于 beta 版、预览版或尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览的信息,请参阅 Azure HDInsight on AKS 预览信息。 有关问题或功能建议,请在 AskHDInsight 上提交请求,提供详细信息,并关注我们以获取 Azure HDInsight 社区 的更多更新。

使用此示例,了解如何在 Apache FlinkSQL 上创建 Kafka 表。

先决条件

Kafka 连接器允许从 Kafka 主题读取数据并将数据写入 Kafka 主题。 有关详细信息,请参阅 Apache Kafka SQL 连接器

在 HDInsight Kafka 上准备主题和数据

使用 weblog.py 准备消息

import random
import json
import time
from datetime import datetime

user_set = [
        'John',
        'XiaoMing',
        'Mike',
        'Tom',
        'Machael',
        'Zheng Hu',
        'Zark',
        'Tim',
        'Andrew',
        'Pick',
        'Sean',
        'Luke',
        'Chunck'
]

web_set = [
        'https://google.com',
        'https://facebook.com?id=1',
        'https://tmall.com',
        'https://baidu.com',
        'https://taobao.com',
        'https://aliyun.com',
        'https://apache.com',
        'https://flink.apache.com',
        'https://hbase.apache.com',
        'https://github.com',
        'https://gmail.com',
        'https://stackoverflow.com',
        'https://python.org'
]

def main():
        while True:
                if random.randrange(10) < 4:
                        url = random.choice(web_set[:3])
                else:
                        url = random.choice(web_set)

                log_entry = {
                        'userName': random.choice(user_set),
                        'visitURL': url,
                        'ts': datetime.now().strftime("%m/%d/%Y %H:%M:%S")
                }

                print(json.dumps(log_entry))
                time.sleep(0.05)

if __name__ == "__main__":
    main()

数据流发送到 Kafka 主题

sshuser@hn0-contsk:~$ python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events

其他命令:

-- create topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic click_events --bootstrap-server wn0-contsk:9092

-- delete topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete  --topic click_events --bootstrap-server wn0-contsk:9092

-- consume topic
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic click_events --from-beginning
{"userName": "Luke", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Tom", "visitURL": "https://stackoverflow.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Chunck", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Chunck", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Andrew", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Pick", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Mike", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Zheng Hu", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Luke", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:44"}

有关如何使用 Secure Shell Flink SQL 客户端的详细说明。

将 Kafka SQL 连接器 & 依赖项下载到 SSH

我们在以下步骤中使用 Kafka 3.2.0 依赖项,您需要根据 HDInsight 群集上的 Kafka 版本更新命令。

wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

现在,让我们使用 Kafka SQL 客户端 jar 连接到 Flink SQL 客户端。

msdata@pod-0 [ /opt/flink-webssh ]$ bin/sql-client.sh -j flink-connector-kafka-1.17.0.jar -j kafka-clients-3.2.0.jar

让我们在 Flink SQL 上创建 Kafka 表,然后在 Flink SQL 上选择 Kafka 表。

您需要更新以下代码片段中的 Kafka 引导服务器 IP地址。

CREATE TABLE KafkaTable (
`userName` STRING,
`visitURL` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'click_events',
'properties.bootstrap.servers' = '<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092',
'properties.group.id' = 'my_group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);

select * from KafkaTable;

显示如何在 Flink SQL 上创建和选择 Kafka 表的屏幕截图。

生成 Kafka 消息

现在,让我们使用 HDInsight Kafka 向同一主题生成 Kafka 消息。

python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events

可以在 Flink SQL 上监视表。

显示如何监视 Flink SQL 上的表日期的屏幕截图。

下面是 Flink Web UI 上的流式处理作业。

显示 Flink Web UI 上的作业的屏幕截图。

参考