如何在 AKS 上的 HDInsight 上将 Hive 目录与 Apache Flink® 配合使用
重要
AKS 上的 Azure HDInsight 已于 2025 年 1 月 31 日停用。 了解此公告的详细信息。
需要将工作负荷迁移到 Microsoft Fabric 或等效的 Azure 产品,以避免工作负荷突然终止。
重要
此功能目前以预览版提供。 Microsoft Azure预览版补充使用条款 包括适用于处于测试版、预览版或尚未全面发布的Azure功能的更多法律条款。 有关此特定预览版的信息,请参阅 AKS 上的 Azure HDInsight 预览信息。 有关问题或功能建议,请在 AskHDInsight 上提交请求,并提供详细信息。关注我们以获取 Azure HDInsight 社区 的更多更新。
此示例使用 Hive 的元存储作为 Apache Flink 的 Hive Catalog 的持久性目录。 我们使用此功能跨会话在 Flink 上存储 Kafka 表和 MySQL 表元数据。 Flink 使用在 Hive Catalog 中注册的 Kafka 表作为数据源,执行一些数据查找,并将结果输出到 MySQL 数据库。
先决条件
- 使用 Hive 元存储 3.1.2 的 AKS 上的 HDInsight 上的 Apache Flink 群集
-
HDInsight 上的 Apache Kafka 群集
- 您需要确保网络设置已完善,正如 使用 Kafka中所述;以便确保 AKS 上的 HDInsight 和 HDInsight 群集在同一 VNet 中。
- MySQL 8.0.33
Apache Flink 上的 Apache Hive
Flink 提供与 Hive 的双重集成。
- 第一步是将 Hive 元存储(HMS)用作包含 Flink 的 HiveCatalog 的持久目录,用于跨会话存储 Flink 特定元数据。
- 例如,用户可以使用 HiveCatalog 将 Kafka 或 ElasticSearch 表存储在 Hive 元存储中,并在 SQL 查询中稍后重复使用它们。
- 第二种方法是提供 Flink 作为用于读取和写入 Hive 表的替代引擎。
- HiveCatalog 设计为与现有 Hive 安装兼容的“开箱即用”。 无需修改现有的 Hive 元存储或更改表的数据放置或分区。
有关详细信息,请参阅 Apache Hive
环境准备
使用 HMS 创建 Apache Flink 群集
让我们在 Azure 门户中使用 HMS 创建 Apache Flink 群集,您可以参阅关于 Flink 群集创建的详细说明。
创建群集后,请检查 HMS 是否在 AKS 端运行。
在 HDInsight 上准备用户订单事务数据 Kafka 主题
使用以下命令下载 kafka 客户端 jar:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
使用
tar -xvf kafka_2.12-3.2.0.tgz
将消息生成到 Kafka 主题。
其他命令:
注意
你需要将 bootstrap-server 替换为你自己的 Kafka broker 主机名或 IP 地址。
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
在 Azure 上的 MySQL 上准备用户订单主数据
测试数据库:
准备订单表:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
使用 SSH 下载所需的 Kafka 连接器和 MySQL 数据库 JAR 文件
注意
根据 HDInsight Kafka 版本和 MySQL 版本下载正确的 JAR 文件。
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
移动规划器 jar
移动位于 webssh pod 的 /opt 中的 jar 文件 flink-table-planner_2.12-1.17.0-。。。.jar 到 /lib,并将 jar 文件 flink-table-planner-loader1.17.0-。。。.jar 从 /lib 移动到 /opt/flink-webssh/opt/。 有关更多详细信息,请参阅 问题。 执行以下步骤以移动 Planner Jar。
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
注意
仅当使用 Hive 方言或 HiveServer2 终结点时,才需要进行额外的规划器 jar 移动。 但是,这是 Hive 集成的建议设置。
验证
使用 bin/sql-client.sh 连接到 Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
创建 Hive 目录并使用 Flink SQL 连接到 Hive 目录
注意
由于我们已经将 Flink 群集与 Hive 元存储配合使用,因此无需执行任何其他配置。
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
在 Apache Flink SQL 上创建 Kafka 表
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
在 Apache Flink SQL 上创建 MySQL 表
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
检查在上述 Hive 目录中在 Flink SQL 上注册的表。
将用户交易订单信息存入 Flink SQL 上 MySQL 的主订单表中。
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
在 Azure Cloud Shell 上的 MySQL 中,检查 Kafka 上的用户事务订单数据是否以主表顺序添加
在 Kafka 上再创建三个用户订单
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
检查 Flink SQL 上的 Kafka 表数据
Flink SQL> select * from kafka_user_orders;
在 Flink SQL 上的 MySQL 上将 product_id=104
插入订单表中
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
在 Azure Cloud Shell 的 MySQL 中检查是否已在订单表中添加了 product_id = 104
记录。
参考
- Apache Hive
- Apache、Apache Hive、Hive、Apache Flink、Flink 和关联的开源项目名称是 Apache Software Foundation (ASF)的 商标。