次の方法で共有


ゲートウェイ モードで SQL クライアント CLI を起動する

Note

Azure HDInsight on AKS は 2025 年 1 月 31 日に廃止されます。 2025 年 1 月 31 日より前に、ワークロードを Microsoft Fabric または同等の Azure 製品に移行することで、ワークロードの突然の終了を回避する必要があります。 サブスクリプション上に残っているクラスターは停止され、ホストから削除されることになります。

提供終了日までは基本サポートのみが利用できます。

重要

現在、この機能はプレビュー段階にあります。 ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用されるその他の法律条項については、「Microsoft Azure プレビューの追加の使用条件」に記載されています。 この特定のプレビューについては、「Microsoft HDInsight on AKS のプレビュー情報」を参照してください。 質問や機能の提案については、詳細を記載した要求を AskHDInsight で送信してください。また、その他の更新情報については、Azure HDInsight コミュニティのフォローをお願いいたします。

このチュートリアルでは、AKS 上の HDInsight 上の Apache Flink クラスター 1.17.0 で SQL クライアント CLI をゲートウェイ モードで起動する方法について説明します。 ゲートウェイ モードでは、CLI は SQL を指定されたリモート ゲートウェイに送信してステートメントを実行します。

./bin/sql-client.sh gateway --endpoint <gateway address>

Note

AKS 上の HDInsight 上の Apache Flink クラスターでは、外部接続はすべて 443 ポートを経由します。 ただし、内部的には、ポート 8083 をリッスンしている sql-gateway サービスに要求が再ルーティングされます。

AKS 側で SQL ゲートウェイ サービスを確認します。

SQL ゲートウェイの確認方法を示すスクリーンショット。

Flink の Table & SQL API を使用すると、SQL 言語で記述されたクエリを操作できますが、これらのクエリは Java または Scala で記述されたテーブル プログラム内に埋め込む必要があります。 さらに、これらのプログラムは、クラスターに送信する前に、ビルド ツールを使用してパッケージ化する必要があります。 この機能があるため、Flink の使用は Java/Scala プログラマに限定されます。

SQL クライアントは、Java コードまたは Scala コードを 1 行も使用せずに、テーブル プログラムを記述し、デバッグし、Flink クラスターに送信する簡単な方法を提供することを目的としています。 SQL クライアント CLI を使用すると、コマンド ラインで実行中の分散型アプリケーションからリアルタイムの結果を取得して視覚化することができます。

詳細については、「webssh で Flink SQL CLI クライアントに入る方法」を参照してください。

SQL ゲートウェイは、複数のクライアントがリモートからコンカレンシーで SQL を実行できるようにするサービスです。 これにより、Flink ジョブを送信し、メタデータを検索して、オンラインでデータを分析する簡単な方法が提供されます。

詳細については、「SQL ゲートウェイ」を参照してください。

AKS 上の HDInsight 上の Apache Flink クラスターで、次のコマンドを実行して SQL クライアント CLI をゲートウェイ モードで起動します。

./bin/sql-client.sh gateway --endpoint host:port
 
or
 
./bin/sql-client.sh gateway --endpoint https://fqdn/sql-gateway

Azure portal でクラスター エンドポイント (ホストまたは FQDN) を取得します。

クラスター エンドポイントを示すスクリーンショット。

テスト

準備

  1. ローカル Windows マシンで https://aka.ms/hdionaksflink117clilinux から Flink CLI をダウンロードします。

これをローカル Windows マシンで機能させるには、Linux 用 Windows サブシステムをインストールします。

  1. Windows コマンドを開いて実行し (JAVA_HOME と flink-cli のパスを独自のパスに置き換えて)、flink-cli をダウンロードします。

    Windows Subsystem for Linux --distribution Ubuntu 
    export JAVA_HOME=/mnt/c/Work/99_tools/zulu11.56.19-ca-jdk11.0.15-linux_x64
    cd <folder>
    wget https://hdiconfigactions.blob.core.windows.net/hiloflink17blob/flink-cli.tgz
    tar -xvf flink-cli.tgz
    
  2. flink-conf.yaml でエンドポイント、テナント ID、ポート 443 を設定します。

    user@MININT-481C9TJ:/mnt/c/Users/user/flink-cli$ cd conf
    user@MININT-481C9TJ:/mnt/c/Users/user/flink-cli/conf$ ls -l
    total 8
    -rwxrwxrwx 1 user user 2451 Feb 26 20:33 flink-conf.yaml
    -rwxrwxrwx 1 user user 2946 Feb 23 14:13 log4j-cli.properties
    
    user@MININT-481C9TJ:/mnt/c/Users/user/flink-cli/conf$ cat flink-conf.yaml
    
    rest.address: <flink cluster endpoint on Azure portal>
    azure.tenant.id: <tenant ID>
    rest.port: 443
    
  3. AKS クラスター サブネットのネットワーク セキュリティのインバウンド ルールで HDInsight への VPN アクセスを許可し、ポート 443 を介したローカル Windows のパブリック IP を許可リストに登録します。

    パブリック IP アドレスを許可する方法を示すスクリーンショット。

  4. Flink-cli で sql-client.sh をゲートウェイ モードで実行して Flink SQL にアクセスします。

    bin/sql-client.sh gateway --endpoint https://fqdn/sql-gateway
    

    user@MININT-481C9TJ:/mnt/c/Users/user/flink-cli$ bin/sql-client.sh gateway --endpoint https://fqdn/sql-gateway
    
                                       ▒▓██▓██▒
                                   ▓████▒▒█▓▒▓███▓▒
                                ▓███▓░░        ▒▒▒▓██▒  ▒
                              ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                              ██▒         ░▒▓███▒    ▒█▒█▒
                                ░▓█            ███   ▓░▒██
                                  ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                                █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                                ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                             ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                       ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                      ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                    ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
                   ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
                  ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
               ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
               ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
               ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
               ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
              ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
              █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
              ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
              ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
               ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
               ▓█   ▒█▓   ░     █░                ▒█              █▓
                █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
                 █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
                  ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
                   ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                    ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                      ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                          ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░
    
        ______ _ _       _       _____  ____  _         _____ _ _            _  BETA
       |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |
       | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_
       |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
       | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_
       |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
    
            Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/user/.flink-sql-history
    
  5. 外部ソースを使用してテーブルに対してクエリを実行する前に、関連する jar を準備します。 次の例では、Flink SQL で kafka テーブル、mysql テーブルに対してクエリを実行します。 jar をダウンロードし、Flink クラスターに接続された Azure Data Lake Storage gen2 ストレージに配置します。

    Azure portal の Azure Data Lake Storage gen2 にある jar:

    Azure portal の jar ファイルを示すスクリーンショット。

  6. 既に作成されているテーブルを使用して、管理のために Hive メタストアに配置した後、クエリを実行します。

    Note

    この例では、AKS 上の HDInsight 内のすべての jar はデフォルトで Azure Data Lake Storage Gen2 です。 コンテナーとストレージ アカウントは、クラスターの作成時に指定したものと同じである必要はありません。 必要に応じて、別のストレージ アカウントを指定し、クラスター ユーザー マネージド ID に Azure Data Lake Storage Gen2 側のストレージ BLOB データ所有者ロールを付与できます。

    CREATE CATALOG myhive WITH (
        'type' = 'hive'
    );
    
    USE CATALOG myhive;
    
    // ADD jar into environment
    ADD JAR 'abfs://<container>@<storage name>.dfs.core.windows.net/jar/flink-connector-jdbc-3.1.0-1.17.jar';
    ADD JAR 'abfs://<container>@<storage name>.dfs.core.windows.net/jar/mysql-connector-j-8.0.33.jar';
    ADD JAR 'abfs://<container>@<storage name>.dfs.core.windows.net/jar/kafka-clients-3.2.0.jar';
    ADD JAR 'abfs://<container>@<storage name>.dfs.core.windows.net/jar/flink-connector-kafka-1.17.0.jar';
    
    Flink SQL> show jars;
    ----------------------------------------------------------------------------------------------+
    |                                                                                         jars |
    +----------------------------------------------------------------------------------------------+
    |    abfs://<container>@<storage name>.dfs.core.windows.net/jar/flink-connector-kafka-1.17.0.jar |
    | abfs://<container>@<storage name>.dfs.core.windows.net/jar/flink-connector-jdbc-3.1.0-1.17.jar |
    |             abfs://<container>@<storage name>.dfs.core.windows.net/jar/kafka-clients-3.2.0.jar |
    |        abfs://<container>@<storage name>.dfs.core.windows.net/jar/mysql-connector-j-8.0.33.jar |
    +----------------------------------------------------------------------------------------------+
    4 rows in set
    
    Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
    [INFO] Execute statement succeed.
    
    Flink SQL> show tables;
    +----------------------+
    |           table name |
    +----------------------+
    | flightsintervaldata1 |
    |    kafka_user_orders |
    |           kafkatable |
    |    mysql_user_orders |
    |               orders |
    +----------------------+
    5 rows in set
    
    // mysql cdc table
    Flink SQL> select * from mysql_user_orders;
    +----+-------------+----------------------------+-------------+--------------------------------+--------------+-------------+--------------+
    | op |    order_id |                 order_date | customer_id |                  customer_name |        price |  product_id | order_status |
    +----+-------------+----------------------------+-------------+--------------------------------+--------------+-------------+--------------+
    | +I |       10001 | 2023-07-16 10:08:22.000000 |           1 |                           Jark |     50.00000 |
    102 |        FALSE |
    | +I |       10002 | 2023-07-16 10:11:09.000000 |           2 |                          Sally |     15.00000 |
    105 |        FALSE |
    | +I |       10003 | 2023-07-16 10:11:09.000000 |           3 |                          Sally |     25.00000 |
    
    

リファレンス

AKS クラスター上の HDInsight 上の Apache Flink® コマンド ライン インターフェイス (CLI)