HDInsight on AKS の Apache Flink® で Hive カタログを使用する方法
Note
Azure HDInsight on AKS は 2025 年 1 月 31 日に廃止されます。 2025 年 1 月 31 日より前に、ワークロードを Microsoft Fabric または同等の Azure 製品に移行することで、ワークロードの突然の終了を回避する必要があります。 サブスクリプション上に残っているクラスターは停止され、ホストから削除されることになります。
提供終了日までは基本サポートのみが利用できます。
重要
現在、この機能はプレビュー段階にあります。 ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用されるその他の法律条項については、「Microsoft Azure プレビューの追加の使用条件」に記載されています。 この特定のプレビューについては、「Microsoft HDInsight on AKS のプレビュー情報」を参照してください。 質問や機能の提案については、詳細を記載した要求を AskHDInsight で送信してください。また、その他の更新については、Azure HDInsight コミュニティのフォローをお願いいたします。
この例では、Apache Flink の Hive カタログと共に、永続的なカタログとして Hive のメタストアを使用します。 この機能は、セッションをまたいで Flink に Kafka テーブルと MySQL テーブルのメタデータを格納するために使用します。 Flink は、Hive カタログに登録されている Kafka テーブルをソースとして使用し、ルックアップを実行して、結果を MySQL データベースにシンクします
前提条件
- Hive Metastore 3.1.2 を使用する HDInsight on AKS の Apache Flink クラスター
- HDInsight の Apache Kafka クラスター
- Kafka の使用に関する記事で説明されているように、ネットワークの設定を確実に完了する必要があります。これは、HDInsight on AKS および HDInsight クラスターを同じ VNet に確実に配置するためです
- MySQL 8.0.33
Apache Flink の Apache Hive
Flink は、Hive との二重の統合を提供します。
- 最初のステップは、セッションをまたいで Flink 固有のメタデータを保存するために、Flink の HiveCatalog と共に、永続的なカタログとして Hive メタストア (HMS) を使用することです。
- たとえば、ユーザーは HiveCatalog を使用して Kafka テーブルや ElasticSearch テーブルを Hive メタストアに保存し、後で SQL クエリで再利用できます。
- 2 つ目は、Hive テーブルの読み取りと書き込みの代替エンジンとして Flink を提供することです。
- HiveCatalog は、既存の Hive インストールと "そのまま" で互換性を持つように設計されています。 既存の Hive メタストアを変更したり、テーブルのデータ配置やパーティション分割を変更したりする必要はありません。
詳細については、Apache Hive に関するページを参照してください
環境の準備
HMS を使用して Apache Flink クラスターを作成する
Azure portal で HMS を使用して Apache Flink クラスターを作成します。詳しい手順については、Flink クラスターの作成に関する記事を参照してください。
クラスターの作成後、HMS が AKS 側で実行されているかどうかを確認します。
HDInsight でユーザー注文トランザクション データの Kafka トピックを準備する
次のコマンドを使用して、kafka クライアント の jar をダウンロードします。
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
以下を使用して tar ファイルを解凍します
tar -xvf kafka_2.12-3.2.0.tgz
Kafka トピックへのメッセージを生成します。
その他のコマンド:
Note
ブートストラップ サーバーを独自の kafka ブローカーのホスト名または IP に置き換える必要があります
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Azure 上の MySQL でユーザー注文マスター データを準備する
DB のテスト:
注文テーブルの準備:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
SSH を使用して、必要な Kafka コネクタと MySQL Database の jar をダウンロードする
Note
HDInsight kafka のバージョンと MySQL のバージョンに従って、正しいバージョンの jar をダウンロードします。
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
planner jar の移動
webssh ポッドの /opt にある jar ファイル flink-table-planner_2.12-1.17.0-....jar を /lib に移動し、/opt/flink-webssh/opt/ にある jar ファイル flink-table-planner-loader1.17.0-....jar を /lib から移動します。 詳細については、問題に関するページを参照してください。 planner jar を移動するには、次の手順を実行します。
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Note
追加の planner jar の移動は、Hive 言語または HiveServer2 エンドポイントを使用する場合にのみ必要です。 ただし、これは Hive 統合のための推奨セットアップです。
検証
bin/sql-client.sh を使用して Flink SQL に接続する
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Hive カタログを作成し、Flink SQL で Hive カタログに接続する
Note
Hive メタストアで Flink クラスターを既に使用しているため、追加の構成を実行する必要はありません。
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Apache Flink SQL で Kafka テーブルを作成する
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
Apache Flink SQL で MySQL テーブルを作成する
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Flink SQL で上記の Hive カタログに登録されているテーブルを確認する
Flink SQL でユーザー トランザクション注文情報を MySQL のマスター注文テーブルにシンクする
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Azure Cloud Shell で Kafka のユーザー トランザクション注文データが MySQL のマスター注文テーブルに追加されているかどうか確認する
Kafka でさらに 3 つのユーザー注文を作成する
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL で Kafka テーブル データを確認する
Flink SQL> select * from kafka_user_orders;
Flink SQL で MySQL の注文テーブルに product_id=104
を挿入する
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
Azure Cloud Shell で product_id = 104
レコードが MySQL の注文テーブルに追加されているか確認する
リファレンス
- Apache Hive
- Apache、Apache Hive、Hive、Apache Flink、Flink、関連するオープン ソース プロジェクト名は、Apache Software Foundation (ASF) の商標です。