如何在 AKS 上使用 HDInsight 的 Apache Flink® 搭配 Hive Catalog
重要
AKS 上的 Azure HDInsight 於 2025 年 1 月 31 日淘汰。 透過此公告 深入瞭解。
您必須將工作負載移轉至 Microsoft Fabric 或對等 Azure 產品,以避免突然終止工作負載。
重要
這項功能目前為預覽狀態。 Microsoft Azure 預覽版的補充使用規定 包含適用於 Beta 版、預覽版或尚未正式發行之 Azure 功能的更合法條款。 如需此特定預覽的資訊,請參閱 AKS 上的 Azure HDInsight 預覽資訊。 如有問題或功能建議,請至 AskHDInsight 提交請求,並關注我們以獲取 Azure HDInsight 社群 的更多更新。
此範例使用 Hive 的 Metastore 作為具有 Apache Flink Hive Catalog 的持久性目錄。 我們會使用這項功能,跨會話在 Flink 上儲存 Kafka 數據表和 MySQL 數據表元數據。 Flink 使用註冊在 Hive Catalog 的 Kafka 表作為來源,執行一些查詢並將結果匯入 MySQL 資料庫。
先決條件
- 使用Hive中繼存放區 3.1.2 在 AKS 上的 HDInsight 上 Apache Flink 叢集
- 在 HDInsight 上 Apache Kafka 叢集
- 您必須確保網路設定已完成,如 使用 Kafka中所述;以確保 AKS 上的 HDInsight 和 HDInsight 叢集位於相同的 VNet 中。
- MySQL 8.0.33
Apache Flink 上的 Apache Hive
Flink 提供與 Hive 的雙重整合。
- 第一步是使用 Hive 中繼資料儲存庫(HMS)作為 Flink 的 HiveCatalog 的持久性目錄,以在不同的會話間儲存 Flink 特定的元數據。
- 例如,使用者可以使用HiveCatalog將其Kafka或ElasticSearch資料表儲存在Hive中繼存放區中,並在稍後在SQL查詢中重複使用。
- 第二個是提供 Flink 作為讀取和寫入 Hive 數據表的替代引擎。
- HiveCatalog 是設計成與現有的 Hive 安裝相容的開箱即用解決方案。 您不需要修改現有的Hive中繼存放區,或變更數據表的數據放置或分割。
如需詳細資訊,請參閱 Apache Hive
環境準備
使用 HMS 建立 Apache Flink 叢集
讓我們在 Azure 入口網站上建立具有 HMS 的 Apache Flink 叢集,請參閱建立 Flink 叢集的詳細指示。
建立叢集之後,請檢查 HMS 是否在 AKS 端執行。
在 HDInsight 上準備使用者訂單交易數據的 Kafka 主題
使用下列命令下載 Kafka 客戶端 JAR:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
使用 解壓縮 tar 檔案
tar -xvf kafka_2.12-3.2.0.tgz
產生 Kafka 主題的訊息。
其他命令:
注意
您必須將 bootstrap-server 取代為您自己的 Kafka 代理伺服器主機名稱或 IP
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
在 Azure 上的 MySQL 上準備用戶訂單主要數據
測試 DB:
準備訂單數據表:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
使用 SSH 下載所需的 Kafka 連接器和 MySQL 資料庫的 jar 檔案。
注意
根據 HDInsight kafka 版本和 MySQL 版本下載正確的 jar 文件版本。
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
移動規劃器罐
將位於 webssh pod 的 /opt 的 jar flink-table-planner_2.12-1.17.0-。。。.jar 移動到 /lib,並將位於 /lib 的 jar flink-table-planner-loader1.17.0-。。。.jar 移動出到 /opt/flink-webssh/opt/。 如需詳細資訊,請參閱 問題。 執行下列步驟來移動 planner jar 檔案。
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
注意
只有在使用 Hive 方言或 HiveServer2 端點時,才需要額外移動規劃器 jar。 不過,這是Hive整合的建議設定。
驗證
使用 bin/sql-client.sh 連線到 Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
建立Hive目錄並連線到 Flink SQL 上的 Hive 目錄
注意
由於我們已經搭配Hive中繼存放區使用 Flink 叢集,因此不需要執行任何其他設定。
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
在 Apache Flink SQL 上建立 Kafka 數據表
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
在 Apache Flink SQL 上建立 MySQL 數據表
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
檢視 Flink SQL 上述的 Hive 目錄中註冊的數據表
將使用者交易訂單資訊寫入 MySQL 的 Flink SQL 主訂單表中
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
檢查 Kafka 上的使用者交易訂單數據是否已新增至 Azure Cloud Shell 上 MySQL 的主表訂單中。
在 Kafka 上建立另外三個用戶訂單
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
檢查 Flink SQL 上的 Kafka 數據表數據
Flink SQL> select * from kafka_user_orders;
將 product_id=104
插入 Flink SQL 上 MySQL 的訂單數據表
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
請確認 product_id = 104
記錄是否已在 Azure Cloud Shell 上的 MySQL 的訂單表中新增
參考
- Apache Hive
- Apache、Apache Hive、Hive、Apache Flink、Flink 和相關聯的開放原始碼專案名稱是 商標 的 Apache Software Foundation(ASF)。