在 AKS 上的 HDInsight 上的 Apache Flink® 上创建 Apache Kafka® 表
重要
AKS 上的 Azure HDInsight 已于 2025 年 1 月 31 日停用。 了解更多信息,请查看此公告。
需要将工作负荷迁移到 Microsoft Fabric 或等效的 Azure 产品,以避免工作负荷突然终止。
重要
此功能目前以预览版提供。 Microsoft Azure 预览版补充使用条款 包含适用于 beta 版、预览版或尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览的信息,请参阅 Azure HDInsight on AKS 预览信息。 有关问题或功能建议,请在 AskHDInsight 上提交请求,提供详细信息,并关注我们以获取 Azure HDInsight 社区 的更多更新。
使用此示例,了解如何在 Apache FlinkSQL 上创建 Kafka 表。
先决条件
Apache Flink 上的 Kafka SQL 连接器
Kafka 连接器允许从 Kafka 主题读取数据并将数据写入 Kafka 主题。 有关详细信息,请参阅 Apache Kafka SQL 连接器。
在 Flink SQL 上创建 Kafka 表
在 HDInsight Kafka 上准备主题和数据
使用 weblog.py 准备消息
import random
import json
import time
from datetime import datetime
user_set = [
'John',
'XiaoMing',
'Mike',
'Tom',
'Machael',
'Zheng Hu',
'Zark',
'Tim',
'Andrew',
'Pick',
'Sean',
'Luke',
'Chunck'
]
web_set = [
'https://google.com',
'https://facebook.com?id=1',
'https://tmall.com',
'https://baidu.com',
'https://taobao.com',
'https://aliyun.com',
'https://apache.com',
'https://flink.apache.com',
'https://hbase.apache.com',
'https://github.com',
'https://gmail.com',
'https://stackoverflow.com',
'https://python.org'
]
def main():
while True:
if random.randrange(10) < 4:
url = random.choice(web_set[:3])
else:
url = random.choice(web_set)
log_entry = {
'userName': random.choice(user_set),
'visitURL': url,
'ts': datetime.now().strftime("%m/%d/%Y %H:%M:%S")
}
print(json.dumps(log_entry))
time.sleep(0.05)
if __name__ == "__main__":
main()
数据流发送到 Kafka 主题
sshuser@hn0-contsk:~$ python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events
其他命令:
-- create topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic click_events --bootstrap-server wn0-contsk:9092
-- delete topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic click_events --bootstrap-server wn0-contsk:9092
-- consume topic
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic click_events --from-beginning
{"userName": "Luke", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Tom", "visitURL": "https://stackoverflow.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Chunck", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Chunck", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Andrew", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Pick", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Mike", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Zheng Hu", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Luke", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:44"}
Apache Flink SQL 客户端
有关如何使用 Secure Shell Flink SQL 客户端的详细说明。
将 Kafka SQL 连接器 & 依赖项下载到 SSH
我们在以下步骤中使用 Kafka 3.2.0 依赖项,您需要根据 HDInsight 群集上的 Kafka 版本更新命令。
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
连接到 Apache Flink SQL 客户端
现在,让我们使用 Kafka SQL 客户端 jar 连接到 Flink SQL 客户端。
msdata@pod-0 [ /opt/flink-webssh ]$ bin/sql-client.sh -j flink-connector-kafka-1.17.0.jar -j kafka-clients-3.2.0.jar
在 Apache Flink SQL 上创建 Kafka 表
让我们在 Flink SQL 上创建 Kafka 表,然后在 Flink SQL 上选择 Kafka 表。
您需要更新以下代码片段中的 Kafka 引导服务器 IP地址。
CREATE TABLE KafkaTable (
`userName` STRING,
`visitURL` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'click_events',
'properties.bootstrap.servers' = '<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092',
'properties.group.id' = 'my_group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
select * from KafkaTable;
生成 Kafka 消息
现在,让我们使用 HDInsight Kafka 向同一主题生成 Kafka 消息。
python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events
Apache Flink SQL 上的表
可以在 Flink SQL 上监视表。
下面是 Flink Web UI 上的流式处理作业。
参考
- Apache Kafka SQL 连接器
- Apache、Apache Kafka、Kafka、Apache Flink、Flink 和关联的开源项目名称 Apache Software Foundation(ASF) 商标。