AKS 上の HDInsight クラスターで動作する Apache Flink® の Table API と SQL
大事な
AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 についてこのお知らせを参照してください。
ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。
大事な
この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能の提案については、AskHDInsight に詳細を記載したリクエストを送信してください。また、最新情報を受け取るために Azure HDInsight Communityをフォローしてください。
Apache Flink には、統合ストリームとバッチ処理用の 2 つのリレーショナル API (Table API と SQL) が用意されています。 Table API は、選択、フィルター、結合などの関係演算子からのクエリを直感的に構成できる、言語統合クエリ API です。 Flink の SQL サポートは、SQL 標準を実装する Apache Calcite に基づいています。
Table API と SQL インターフェイスは、互いにシームレスに統合され、Flink の DataStream API と統合されます。 それらを基盤にしたすべての API とライブラリの間を簡単に切り替えることができます。
Apache Flink SQL
他の SQL エンジンと同様に、Flink クエリはテーブルの上で動作します。 Flink は保存データをローカルで管理しないため、従来のデータベースとは異なります。代わりに、そのクエリは外部テーブルに対して継続的に動作します。
Flink データ処理パイプラインは、ソース テーブルで始まり、シンク テーブルで終わります。 ソース テーブルでは、クエリの実行中に操作される行が生成されます。これらは、クエリの FROM 句で参照されるテーブルです。 コネクタには、HDInsight Kafka、HDInsight HBase、Azure Event Hubs、データベース、ファイルシステム、またはクラスパスにコネクタがあるその他のシステムを使用できます。
AKS クラスターでの HDInsight での Flink SQL クライアントの使用
この記事では、Azure portal で Secure Shell CLI を使用する方法について説明します。 開始する方法の簡単なサンプルを次に示します。
SQL クライアントを起動するには
./bin/sql-client.sh
sql-client と共に実行する初期化 SQL ファイルを渡すには
./sql-client.sh -i /path/to/init_file.sql
sql-client で構成を設定するには
SET execution.runtime-mode = streaming; SET sql-client.execution.result-mode = table; SET sql-client.execution.max-table-result.rows = 10000;
SQL DDL
Flink SQL では、次の CREATE ステートメントがサポートされます
- テーブルを作成
- データベースを作成
- カタログの作成
次に、jdbc コネクタを使用して MSSQL に接続するソース テーブルを定義する構文の例を示します。ID と名前は、CREATE TABLE ステートメントの列として指定します。
CREATE TABLE student_information (
id BIGINT,
name STRING,
address STRING,
grade STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
'table-name' = 'students',
'username' = 'username',
'password' = 'password'
);
データベース を作成する
CREATE DATABASE students;
のカタログを作成します。
CREATE CATALOG myhive WITH ('type'='hive');
これらのテーブルの上で連続クエリを実行できます。
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
ソース テーブルから シンク テーブル へ書き出します。
INSERT INTO grade_counts
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
依存関係の追加
JAR ステートメントは、ユーザー jar をクラスパスに追加したり、クラスパスからユーザー jar を削除したり、ランタイムのクラスパスに追加された jar を表示したりするために使用されます。
Flink SQL では、次の JAR ステートメントがサポートされています。
- JAR の追加
- JAR を表示する
- JAR の削除
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.
Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.
Flink SQL> SHOW JARS;
+----------------------------+
| jars |
+----------------------------+
| /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+
Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.
HDInsight の AKS 上の Apache Flink® クラスターにおける Hive メタストア
カタログは、データベース、テーブル、パーティション、ビュー、関数などのメタデータと、データベースまたは他の外部システムに格納されているデータにアクセスするために必要な情報を提供します。
AKS 上の HDInsight では、Flink では 2 つのカタログ オプションがサポートされています。
GenericInMemoryCatalog
GenericInMemoryCatalog は、カタログのメモリ内実装です。 すべてのオブジェクトは、SQL セッションの有効期間中のみ使用できます。
HiveCatalog の
HiveCatalog は、2 つの目的を果たします。は、純粋な Flink メタデータ用の永続的ストレージとして、および既存の Hive メタデータの読み取りと書き込みのインターフェイスとして使用できます。
手記
AKS クラスター上の HDInsight には、Apache Flink 用 Hive Metastore の統合オプションが付属しています。 クラスターの作成時に Hive メタストア 選択
Flink データベースを作成してカタログに登録する方法
この記事では、Azure ポータル上の Secure Shell から CLI を使用して Flink SQL クライアントを開始する方法について説明します。
sql-client.sh
セッションを開始するDefault_catalogは既定のメモリ内カタログです
メモリ内カタログの既定のデータベースを確認してみましょう
バージョン 3.1.2 の Hive カタログを作成して使用しましょう
CREATE CATALOG myhive WITH ('type'='hive'); USE CATALOG myhive;
手記
AKS 上の HDInsight では、Hive 3.1.2 と Hadoop 3.3.2がサポートされています。
hive-conf-dir
は場所/opt/hive-conf
に設定されますHive カタログにデータベースを作成し、セッションの既定にします (変更されない限り)。
Hive テーブルを作成して Hive カタログに登録する方法
パーティションなしのコネクタの種類 Hive の Flink テーブルを作成しましょう
CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
hive_tableにデータを挿入する
INSERT INTO hive_table SELECT 2, '10'; INSERT INTO hive_table SELECT 3, '20';
hive_tableからデータを読み取る
Flink SQL> SELECT * FROM hive_table; 2023-07-24 09:46:22,225 INFO org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3 +----+-------------+--------------------------------+ | op | x | days | +----+-------------+--------------------------------+ | +I | 3 | 20 | | +I | 2 | 10 | | +I | 1 | 5 | +----+-------------+--------------------------------+ Received a total of 3 rows
手記
Hive Warehouse ディレクトリは、Apache Flink クラスターの作成時に選択されたストレージ アカウントの指定されたコンテナーにあります。ディレクトリ hive/warehouse/
パーティションを使用してコネクタの種類のハイブの Flink テーブルを作成できます
CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
大事な
Apache Flink には既知の制限があります。 最後の 'n' 列は、ユーザー定義のパーティション列に関係なく、パーティションに対して選択されます。 FLINK-32596 Flink 言語を使用して Hive テーブルを作成すると、パーティション キーが間違っています。
参考
- Apache Flink Table API & SQL
- Apache、Apache Flink、Flink、および関連するオープンソースプロジェクト名は、Apache Software Foundation (ASF) の商標 です。