次の方法で共有


ゲートウェイ モードで SQL クライアント CLI を起動する

大事な

AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 この発表 について詳しく知ることができます。

ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。

大事な

この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能の提案がある場合は、AskHDInsight に詳細を記載してリクエストを送信し、さらにアップデートを希望される場合は Azure HDInsight Communityをフォローしてください。

このチュートリアルでは、AKS 上の HDInsight 上の Apache Flink Cluster 1.17.0 でゲートウェイ モードで SQL クライアント CLI を開始する方法について説明します。 ゲートウェイ モードでは、CLI は SQL を指定されたリモート ゲートウェイに送信してステートメントを実行します。

./bin/sql-client.sh gateway --endpoint <gateway address>

手記

AKS 上の HDInsight 上の Apache Flink クラスターでは、外部接続はすべて 443 ポート経由で行われます。 ただし、内部的には、ポート 8083 をリッスンしている sql-gateway サービスに要求が再ルーティングされます。

AKS 側で SQL ゲートウェイ サービスを確認します。

SQL ゲートウェイを確認する方法を示すスクリーンショット。

Flink の Table & SQL API を使用すると、SQL 言語で記述されたクエリを操作できますが、これらのクエリは Java または Scala で記述されたテーブル プログラム内に埋め込む必要があります。 さらに、これらのプログラムは、クラスターに送信される前にビルド ツールでパッケージ化する必要があります。 この機能により、Flink の使用が Java/Scala プログラマに制限されます。

SQL クライアントは、Java または Scala コードを 1 行も使用せずに、テーブル プログラムを記述、デバッグ、Flink クラスターに送信する簡単な方法を提供することを目的としています。 SQL クライアント CLI を使用すると、コマンド ラインで実行中の分散アプリケーションからリアルタイムの結果を取得して視覚化できます。

詳細については、webssh で Flink SQL CLI クライアントを入力する方法のを参照してください。

SQL ゲートウェイは、リモートから複数のクライアントがコンカレンシーで SQL を実行できるようにするサービスです。 Flink ジョブを送信し、メタデータを検索し、データをオンラインで分析する簡単な方法が提供されます。

詳細については、「SQL Gateway」を参照してください。

AKS 上の HDInsight 上の Apache Flink クラスターで、次のコマンドを実行して、ゲートウェイ モードで SQL クライアント CLI を開始します。

./bin/sql-client.sh gateway --endpoint host:port
 
or
 
./bin/sql-client.sh gateway --endpoint https://fqdn/sql-gateway

Azure portal でクラスター エンドポイント (ホストまたは fqdn) を取得します。

クラスター エンドポイントを示すスクリーンショット。

テスティング

準備

  1. ローカル Windows マシンの https://aka.ms/hdionaksflink117clilinux から Flink CLI をダウンロードします。

Linux 用 Windows サブシステムをインストールして、ローカルの Windows マシンでこの機能を実現します。

  1. Windows コマンドを開き(JAVA_HOMEと flink-cli のパスを独自のパスに置き換えます)、flink-cli をダウンロードします。

    Windows Subsystem for Linux --distribution Ubuntu 
    export JAVA_HOME=/mnt/c/Work/99_tools/zulu11.56.19-ca-jdk11.0.15-linux_x64
    cd <folder>
    wget https://hdiconfigactions.blob.core.windows.net/hiloflink17blob/flink-cli.tgz
    tar -xvf flink-cli.tgz
    
  2. flink-conf.yaml でエンドポイント、テナント ID、ポート 443 を設定する

    user@MININT-481C9TJ:/mnt/c/Users/user/flink-cli$ cd conf
    user@MININT-481C9TJ:/mnt/c/Users/user/flink-cli/conf$ ls -l
    total 8
    -rwxrwxrwx 1 user user 2451 Feb 26 20:33 flink-conf.yaml
    -rwxrwxrwx 1 user user 2946 Feb 23 14:13 log4j-cli.properties
    
    user@MININT-481C9TJ:/mnt/c/Users/user/flink-cli/conf$ cat flink-conf.yaml
    
    rest.address: <flink cluster endpoint on Azure portal>
    azure.tenant.id: <tenant ID>
    rest.port: 443
    
  3. AKS クラスターのサブネット内で、VPN が有効になっている HDInsight に、ポート 443 経由でアクセスする Windows のローカルパブリック IP をネットワークセキュリティの受信許可リストに登録します。

    パブリック IP アドレスを許可する方法を示すスクリーンショット。

  4. Flink-cli から Flink SQL へのゲートウェイ モードで sql-client.sh を実行します。

    bin/sql-client.sh gateway --endpoint https://fqdn/sql-gateway
    

    user@MININT-481C9TJ:/mnt/c/Users/user/flink-cli$ bin/sql-client.sh gateway --endpoint https://fqdn/sql-gateway
    
                                       ▒▓██▓██▒
                                   ▓████▒▒█▓▒▓███▓▒
                                ▓███▓░░        ▒▒▒▓██▒  ▒
                              ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                              ██▒         ░▒▓███▒    ▒█▒█▒
                                ░▓█            ███   ▓░▒██
                                  ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                                █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                                ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                             ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                       ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                      ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                    ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
                   ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
                  ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
               ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
               ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
               ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
               ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
              ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
              █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
              ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
              ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
               ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
               ▓█   ▒█▓   ░     █░                ▒█              █▓
                █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
                 █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
                  ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
                   ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                    ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                      ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                          ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░
    
        ______ _ _       _       _____  ____  _         _____ _ _            _  BETA
       |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |
       | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_
       |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
       | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_
       |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
    
            Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/user/.flink-sql-history
    
  5. 外部ソースを使用してテーブルにクエリを実行する前に、関連する jar を準備します。 次の例では、Flink SQL の kafka テーブル、mysql テーブルのクエリを実行します。 jar をダウンロードし、Azure Data Lake Storage gen2 ストレージに接続された Flink クラスターに配置します。

    Azure ポータルでの Azure Data Lake Storage gen2 のジャー:

    Azure portal の jar ファイルを示すスクリーンショット。

  6. 既に作成されているテーブルを使用し、管理のために Hive メタストアに配置してから、クエリを実行します。

    手記

    この例では、AKS 上の HDInsight のすべての jar が既定の Azure Data Lake Storage Gen2 です。 コンテナーとストレージ アカウントは、クラスターの作成時に指定したアカウントと同じである必要はありません。 必要に応じて、別のストレージ アカウントを指定し、クラスター ユーザーマネージド ID に Azure Data Lake Storage Gen2 側のストレージ BLOB データ所有者ロールを付与できます。

    CREATE CATALOG myhive WITH (
        'type' = 'hive'
    );
    
    USE CATALOG myhive;
    
    // ADD jar into environment
    ADD JAR 'abfs://<container>@<storage name>.dfs.core.windows.net/jar/flink-connector-jdbc-3.1.0-1.17.jar';
    ADD JAR 'abfs://<container>@<storage name>.dfs.core.windows.net/jar/mysql-connector-j-8.0.33.jar';
    ADD JAR 'abfs://<container>@<storage name>.dfs.core.windows.net/jar/kafka-clients-3.2.0.jar';
    ADD JAR 'abfs://<container>@<storage name>.dfs.core.windows.net/jar/flink-connector-kafka-1.17.0.jar';
    
    Flink SQL> show jars;
    ----------------------------------------------------------------------------------------------+
    |                                                                                         jars |
    +----------------------------------------------------------------------------------------------+
    |    abfs://<container>@<storage name>.dfs.core.windows.net/jar/flink-connector-kafka-1.17.0.jar |
    | abfs://<container>@<storage name>.dfs.core.windows.net/jar/flink-connector-jdbc-3.1.0-1.17.jar |
    |             abfs://<container>@<storage name>.dfs.core.windows.net/jar/kafka-clients-3.2.0.jar |
    |        abfs://<container>@<storage name>.dfs.core.windows.net/jar/mysql-connector-j-8.0.33.jar |
    +----------------------------------------------------------------------------------------------+
    4 rows in set
    
    Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
    [INFO] Execute statement succeed.
    
    Flink SQL> show tables;
    +----------------------+
    |           table name |
    +----------------------+
    | flightsintervaldata1 |
    |    kafka_user_orders |
    |           kafkatable |
    |    mysql_user_orders |
    |               orders |
    +----------------------+
    5 rows in set
    
    // mysql cdc table
    Flink SQL> select * from mysql_user_orders;
    +----+-------------+----------------------------+-------------+--------------------------------+--------------+-------------+--------------+
    | op |    order_id |                 order_date | customer_id |                  customer_name |        price |  product_id | order_status |
    +----+-------------+----------------------------+-------------+--------------------------------+--------------+-------------+--------------+
    | +I |       10001 | 2023-07-16 10:08:22.000000 |           1 |                           Jark |     50.00000 |
    102 |        FALSE |
    | +I |       10002 | 2023-07-16 10:11:09.000000 |           2 |                          Sally |     15.00000 |
    105 |        FALSE |
    | +I |       10003 | 2023-07-16 10:11:09.000000 |           3 |                          Sally |     25.00000 |
    
    

参考

AKS クラスター上の HDInsight における Apache Flink® インターフェース (CLI) Command-Line