如何在 AKS 上的 HDInsight 上将 Hive 目录与 Apache Flink® 配合使用

重要

AKS 上的 Azure HDInsight 已于 2025 年 1 月 31 日停用。 了解此公告的详细信息

需要将工作负荷迁移到 Microsoft Fabric 或等效的 Azure 产品,以避免工作负荷突然终止。

重要

此功能目前以预览版提供。 Microsoft Azure预览版补充使用条款 包括适用于处于测试版、预览版或尚未全面发布的Azure功能的更多法律条款。 有关此特定预览版的信息,请参阅 AKS 上的 Azure HDInsight 预览信息。 有关问题或功能建议,请在 AskHDInsight 上提交请求,并提供详细信息。关注我们以获取 Azure HDInsight 社区 的更多更新。

此示例使用 Hive 的元存储作为 Apache Flink 的 Hive Catalog 的持久性目录。 我们使用此功能跨会话在 Flink 上存储 Kafka 表和 MySQL 表元数据。 Flink 使用在 Hive Catalog 中注册的 Kafka 表作为数据源,执行一些数据查找,并将结果输出到 MySQL 数据库。

先决条件

Flink 提供与 Hive 的双重集成。

  • 第一步是将 Hive 元存储(HMS)用作包含 Flink 的 HiveCatalog 的持久目录,用于跨会话存储 Flink 特定元数据。
    • 例如,用户可以使用 HiveCatalog 将 Kafka 或 ElasticSearch 表存储在 Hive 元存储中,并在 SQL 查询中稍后重复使用它们。
  • 第二种方法是提供 Flink 作为用于读取和写入 Hive 表的替代引擎。
  • HiveCatalog 设计为与现有 Hive 安装兼容的“开箱即用”。 无需修改现有的 Hive 元存储或更改表的数据放置或分区。

有关详细信息,请参阅 Apache Hive

环境准备

让我们在 Azure 门户中使用 HMS 创建 Apache Flink 群集,您可以参阅关于 Flink 群集创建的详细说明。

显示如何创建 Flink 群集的屏幕截图。

创建群集后,请检查 HMS 是否在 AKS 端运行。

显示如何在 Flink 群集中检查 HMS 状态的屏幕截图。

在 HDInsight 上准备用户订单事务数据 Kafka 主题

使用以下命令下载 kafka 客户端 jar:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

使用 解压 tar 文件

tar -xvf kafka_2.12-3.2.0.tgz

将消息生成到 Kafka 主题。

显示如何向 Kafka 主题生成消息的屏幕截图。

其他命令:

注意

你需要将 bootstrap-server 替换为你自己的 Kafka broker 主机名或 IP 地址。

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

在 Azure 上的 MySQL 上准备用户订单主数据

测试数据库:

显示如何在 Kafka 中测试数据库的屏幕截图。

显示如何在门户中运行 Cloud Shell 的屏幕截图。

准备订单表:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

使用 SSH 下载所需的 Kafka 连接器和 MySQL 数据库 JAR 文件

注意

根据 HDInsight Kafka 版本和 MySQL 版本下载正确的 JAR 文件。

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

移动规划器 jar

移动位于 webssh pod 的 /opt 中的 jar 文件 flink-table-planner_2.12-1.17.0-.jar 到 /lib,并将 jar 文件 flink-table-planner-loader1.17.0-.jar 从 /lib 移动到 /opt/flink-webssh/opt/。 有关更多详细信息,请参阅 问题。 执行以下步骤以移动 Planner Jar。

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

注意

仅当使用 Hive 方言或 HiveServer2 终结点时,才需要进行额外的规划器 jar 移动。 但是,这是 Hive 集成的建议设置。

验证

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

注意

由于我们已经将 Flink 群集与 Hive 元存储配合使用,因此无需执行任何其他配置。

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

显示如何创建 Kafka 表的屏幕截图。

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

显示如何创建 mysql 表的屏幕截图。

显示表输出的 屏幕截图。

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

屏幕截图显示如何中止用户事务。

显示 Flink UI 的屏幕截图。

在 Azure Cloud Shell 上的 MySQL 中,检查 Kafka 上的用户事务订单数据是否以主表顺序添加

显示如何检查用户事务的屏幕截图。

在 Kafka 上再创建三个用户订单

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

显示如何检查 Kafka 表数据的屏幕截图。

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

显示如何检查订单表的屏幕截图。

在 Azure Cloud Shell 的 MySQL 中检查是否已在订单表中添加了 product_id = 104 记录。

显示添加到订单表的记录的屏幕截图。

参考