次の方法で共有


AKS 上の HDInsight クラスターで動作する Apache Flink® の Table API と SQL

大事な

AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 についてこのお知らせを参照してください。

ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。

大事な

この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能の提案については、AskHDInsight に詳細を記載したリクエストを送信してください。また、最新情報を受け取るために Azure HDInsight Communityをフォローしてください。

Apache Flink には、統合ストリームとバッチ処理用の 2 つのリレーショナル API (Table API と SQL) が用意されています。 Table API は、選択、フィルター、結合などの関係演算子からのクエリを直感的に構成できる、言語統合クエリ API です。 Flink の SQL サポートは、SQL 標準を実装する Apache Calcite に基づいています。

Table API と SQL インターフェイスは、互いにシームレスに統合され、Flink の DataStream API と統合されます。 それらを基盤にしたすべての API とライブラリの間を簡単に切り替えることができます。

他の SQL エンジンと同様に、Flink クエリはテーブルの上で動作します。 Flink は保存データをローカルで管理しないため、従来のデータベースとは異なります。代わりに、そのクエリは外部テーブルに対して継続的に動作します。

Flink データ処理パイプラインは、ソース テーブルで始まり、シンク テーブルで終わります。 ソース テーブルでは、クエリの実行中に操作される行が生成されます。これらは、クエリの FROM 句で参照されるテーブルです。 コネクタには、HDInsight Kafka、HDInsight HBase、Azure Event Hubs、データベース、ファイルシステム、またはクラスパスにコネクタがあるその他のシステムを使用できます。

この記事では、Azure portal で Secure Shell CLI を使用する方法について説明します。 開始する方法の簡単なサンプルを次に示します。

  • SQL クライアントを起動するには

    ./bin/sql-client.sh
    
  • sql-client と共に実行する初期化 SQL ファイルを渡すには

    ./sql-client.sh -i /path/to/init_file.sql
    
  • sql-client で構成を設定するには

    SET execution.runtime-mode = streaming;
    SET sql-client.execution.result-mode = table;
    SET sql-client.execution.max-table-result.rows = 10000;
    

SQL DDL

Flink SQL では、次の CREATE ステートメントがサポートされます

  • テーブルを作成
  • データベースを作成
  • カタログの作成

次に、jdbc コネクタを使用して MSSQL に接続するソース テーブルを定義する構文の例を示します。ID と名前は、CREATE TABLE ステートメントの列として指定します。

CREATE TABLE student_information (
    id BIGINT,
    name STRING,  
    address STRING,
    grade STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
     'table-name' = 'students',
     'username' = 'username',
     'password' = 'password'
 );

データベース を作成する

CREATE DATABASE students;

のカタログを作成します。

CREATE CATALOG myhive WITH ('type'='hive');

これらのテーブルの上で連続クエリを実行できます。

  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

ソース テーブルから シンク テーブル へ書き出します。

  INSERT INTO grade_counts
  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

依存関係の追加

JAR ステートメントは、ユーザー jar をクラスパスに追加したり、クラスパスからユーザー jar を削除したり、ランタイムのクラスパスに追加された jar を表示したりするために使用されます。

Flink SQL では、次の JAR ステートメントがサポートされています。

  • JAR の追加
  • JAR を表示する
  • JAR の削除
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.

Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.

Flink SQL> SHOW JARS;
+----------------------------+
|                       jars |
+----------------------------+
|            /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+

Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.

カタログは、データベース、テーブル、パーティション、ビュー、関数などのメタデータと、データベースまたは他の外部システムに格納されているデータにアクセスするために必要な情報を提供します。

AKS 上の HDInsight では、Flink では 2 つのカタログ オプションがサポートされています。

GenericInMemoryCatalog

GenericInMemoryCatalog は、カタログのメモリ内実装です。 すべてのオブジェクトは、SQL セッションの有効期間中のみ使用できます。

HiveCatalog

HiveCatalog は、2 つの目的を果たします。は、純粋な Flink メタデータ用の永続的ストレージとして、および既存の Hive メタデータの読み取りと書き込みのインターフェイスとして使用できます。

手記

AKS クラスター上の HDInsight には、Apache Flink 用 Hive Metastore の統合オプションが付属しています。 クラスターの作成時に Hive メタストア 選択

この記事では、Azure ポータル上の Secure Shell から CLI を使用して Flink SQL クライアントを開始する方法について説明します。

  • sql-client.sh セッションを開始する

    既定の Hive カタログを示すスクリーンショット。

    Default_catalogは既定のメモリ内カタログです

  • メモリ内カタログの既定のデータベースを確認してみましょう 既定のメモリ内カタログを示すスクリーンショット。

  • バージョン 3.1.2 の Hive カタログを作成して使用しましょう

      CREATE CATALOG myhive WITH ('type'='hive');
      USE CATALOG myhive;
    

    手記

    AKS 上の HDInsight では、Hive 3.1.2Hadoop 3.3.2がサポートされています。 hive-conf-dir は場所 /opt/hive-conf に設定されます

  • Hive カタログにデータベースを作成し、セッションの既定にします (変更されない限り)。 Hive カタログにデータベースを作成し、セッションの既定のカタログにすることを示すスクリーンショット。

Hive テーブルを作成して Hive カタログに登録する方法

  • Flink データベースを作成してカタログに登録する方法」の手順に従

  • パーティションなしのコネクタの種類 Hive の Flink テーブルを作成しましょう

    CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    
  • hive_tableにデータを挿入する

    INSERT INTO hive_table SELECT 2, '10';
    INSERT INTO hive_table SELECT 3, '20';
    
  • hive_tableからデータを読み取る

      Flink SQL> SELECT * FROM hive_table;
      2023-07-24 09:46:22,225 INFO  org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3
      +----+-------------+--------------------------------+
      | op |           x |                           days |
      +----+-------------+--------------------------------+
      | +I |           3 |                             20 |
      | +I |           2 |                             10 |
      | +I |           1 |                              5 |
      +----+-------------+--------------------------------+
      Received a total of 3 rows
    

    手記

    Hive Warehouse ディレクトリは、Apache Flink クラスターの作成時に選択されたストレージ アカウントの指定されたコンテナーにあります。ディレクトリ hive/warehouse/

  • パーティションを使用してコネクタの種類のハイブの Flink テーブルを作成できます

    CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    

大事な

Apache Flink には既知の制限があります。 最後の 'n' 列は、ユーザー定義のパーティション列に関係なく、パーティションに対して選択されます。 FLINK-32596 Flink 言語を使用して Hive テーブルを作成すると、パーティション キーが間違っています。

参考