次の方法で共有


AKS 上の HDInsight 上の Apache Flink® で Hive カタログを使用する方法

大事な

AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 この発表 からについてさらに学んでください。

ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。

大事な

この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能の提案については、AskHDInsight に詳細を記載したリクエストを送信し、Azure HDInsight Communityをフォローして、最新の更新情報をチェックしてください。

この例では、Apache Flink の Hive カタログを持つ永続的なカタログとして Hive のメタストアを使用します。 この機能は、セッション間で Flink に Kafka テーブルと MySQL テーブルのメタデータを格納するために使用します。 Flink は、Hive カタログに登録されている Kafka テーブルをソースとして使用し、MySQL データベースに対して参照とシンクの結果を実行します

前提 条件

Flink は、Hive と二重の統合を提供します。

  • 最初の手順では、セッション間で Flink 固有のメタデータを格納するために、Flink の HiveCatalog を使用して永続的なカタログとして Hive Metastore (HMS) を使用します。
    • たとえば、ユーザーは HiveCatalog を使用して Kafka テーブルまたは ElasticSearch テーブルを Hive Metastore に格納し、後で SQL クエリで再利用できます。
  • 2 つ目は、Hive テーブルの読み取りと書き込みの代替エンジンとして Flink を提供することです。
  • HiveCatalog は、既存の Hive インストールと互換性のある "すぐに使用できる" よう設計されています。 既存の Hive メタストアを変更したり、テーブルのデータ配置やパーティション分割を変更したりする必要はありません。

詳細については、Apache Hive に関する説明を参照してください。

環境の準備

Azure portal で HMS を使用して Apache Flink クラスターを作成します。Flink クラスターの作成 詳細な手順を参照できます。

Flink クラスターを作成する方法を示すスクリーンショット。

クラスターの作成後、HMS が AKS 側で実行されているかどうかを確認します。

Flink クラスターで HMS の状態を確認する方法を示すスクリーンショット。

HDInsight でユーザーの注文トランザクションデータ用のKafkaトピックを準備する

次のコマンドを使用して、kafka クライアント jar をダウンロードします。

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

tar ファイルをアンターする

tar -xvf kafka_2.12-3.2.0.tgz

Kafka トピックへのメッセージを生成します。

Kafka トピックへのメッセージを生成する方法を示すスクリーンショット。

その他のコマンド:

手記

ブートストラップ サーバーを独自の kafka ブローカーのホスト名または IP に置き換える必要があります

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Azure 上の MySQL でユーザー注文マスター データを準備する

DB のテスト:

Kafka でデータベースをテストする方法を示すスクリーンショット。

ポータルで Cloud Shell を実行する方法を示すスクリーンショット。

注文テーブルの準備:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

SSH ダウンロードが必要な Kafka コネクタと MySQL Database jar を使用する

手記

HDInsight kafka のバージョンと MySQL のバージョンに従って、正しいバージョンの jar をダウンロードします。

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

プランナージャーの移動

jar を移動します。webssh ポッドの /opt に配置されている flink-table-planner_2.12-1.17.0-....jar を /lib に移動し、/lib にある flink-table-planner-loader1.17.0-....jar を /opt/flink-webssh/opt/ に移動します。 詳細については、問題の を参照してください。 Planner jar を移動するには、次の手順を実行します。

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

手記

追加のプランナー jar の移動は、Hive 言語または HiveServer2 エンドポイントを使用する場合にのみ必要です。 ただし、これは Hive 統合に推奨されるセットアップです。

検証

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

手記

Hive Metastore で Flink クラスターを既に使用しているため、追加の構成を実行する必要はありません。

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Kafka テーブルを作成する方法を示すスクリーンショット。

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

mysql テーブルを作成する方法を示すスクリーンショット。

テーブルの出力を示すスクリーンショット。

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

ユーザー トランザクションをシンクする方法を示すスクリーンショット。

Flink UI を示すスクリーンショット。

Kafka のユーザー トランザクション注文データが Azure Cloud Shell 上の MySQL でマスター テーブルの順序で追加されているかどうかを確認する

ユーザー トランザクションを確認する方法を示すスクリーンショット。

Kafka でさらに 3 つのユーザー注文を作成する

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Kafka テーブル データを確認する方法を示すスクリーンショット。

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

注文テーブルを確認する方法を示すスクリーンショット。

Azure Cloud Shell 上の MySQL オーダーテーブルに product_id = 104 のレコードが追加されていることを確認する

注文テーブルに追加されたレコードを示すスクリーンショット。

参考

  • Apache Hive
  • Apache、Apache Hive、Hive、Apache Flink、Flink、および関連するオープンソースプロジェクト名は、Apache Software Foundation (ASF) の 商標です。