ゲートウェイ モードで SQL クライアント CLI を起動する
大事な
AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 この発表 でについて詳しく知ることができます。
ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。
大事な
この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能の提案がある場合は、AskHDInsight に詳細を記載してリクエストを送信し、さらにアップデートを希望される場合は Azure HDInsight Communityをフォローしてください。
このチュートリアルでは、AKS 上の HDInsight 上の Apache Flink Cluster 1.17.0 でゲートウェイ モードで SQL クライアント CLI を開始する方法について説明します。 ゲートウェイ モードでは、CLI は SQL を指定されたリモート ゲートウェイに送信してステートメントを実行します。
./bin/sql-client.sh gateway --endpoint <gateway address>
手記
AKS 上の HDInsight 上の Apache Flink クラスターでは、外部接続はすべて 443 ポート経由で行われます。 ただし、内部的には、ポート 8083 をリッスンしている sql-gateway サービスに要求が再ルーティングされます。
AKS 側で SQL ゲートウェイ サービスを確認します。
Flink の SQL クライアントとは
Flink の Table & SQL API を使用すると、SQL 言語で記述されたクエリを操作できますが、これらのクエリは Java または Scala で記述されたテーブル プログラム内に埋め込む必要があります。 さらに、これらのプログラムは、クラスターに送信される前にビルド ツールでパッケージ化する必要があります。 この機能により、Flink の使用が Java/Scala プログラマに制限されます。
SQL クライアントは、Java または Scala コードを 1 行も使用せずに、テーブル プログラムを記述、デバッグ、Flink クラスターに送信する簡単な方法を提供することを目的としています。 SQL クライアント CLI を使用すると、コマンド ラインで実行中の分散アプリケーションからリアルタイムの結果を取得して視覚化できます。
詳細については、webssh で Flink SQL CLI クライアントを入力する方法のを参照してください。
Flink の SQL ゲートウェイとは
SQL ゲートウェイは、リモートから複数のクライアントがコンカレンシーで SQL を実行できるようにするサービスです。 Flink ジョブを送信し、メタデータを検索し、データをオンラインで分析する簡単な方法が提供されます。
詳細については、「SQL Gateway」を参照してください。
Flink-cli でゲートウェイ モードで SQL クライアント CLI を起動する
AKS 上の HDInsight 上の Apache Flink クラスターで、次のコマンドを実行して、ゲートウェイ モードで SQL クライアント CLI を開始します。
./bin/sql-client.sh gateway --endpoint host:port
or
./bin/sql-client.sh gateway --endpoint https://fqdn/sql-gateway
Azure portal でクラスター エンドポイント (ホストまたは fqdn) を取得します。
テスティング
準備
Flink CLI のダウンロード
- ローカル Windows マシンの https://aka.ms/hdionaksflink117clilinux から Flink CLI をダウンロードします。
Linux 用 Windows サブシステムをインストールして、ローカルの Windows マシンでこの機能を実現します。
Windows コマンドを開き(JAVA_HOMEと flink-cli のパスを独自のパスに置き換えます)、flink-cli をダウンロードします。
Windows Subsystem for Linux --distribution Ubuntu export JAVA_HOME=/mnt/c/Work/99_tools/zulu11.56.19-ca-jdk11.0.15-linux_x64 cd <folder> wget https://hdiconfigactions.blob.core.windows.net/hiloflink17blob/flink-cli.tgz tar -xvf flink-cli.tgz
flink-conf.yaml でエンドポイント、テナント ID、ポート 443 を設定する
user@MININT-481C9TJ:/mnt/c/Users/user/flink-cli$ cd conf user@MININT-481C9TJ:/mnt/c/Users/user/flink-cli/conf$ ls -l total 8 -rwxrwxrwx 1 user user 2451 Feb 26 20:33 flink-conf.yaml -rwxrwxrwx 1 user user 2946 Feb 23 14:13 log4j-cli.properties user@MININT-481C9TJ:/mnt/c/Users/user/flink-cli/conf$ cat flink-conf.yaml rest.address: <flink cluster endpoint on Azure portal> azure.tenant.id: <tenant ID> rest.port: 443
AKS クラスターのサブネット内で、VPN が有効になっている HDInsight に、ポート 443 経由でアクセスする Windows のローカルパブリック IP をネットワークセキュリティの受信許可リストに登録します。
Flink-cli から Flink SQL へのゲートウェイ モードで sql-client.sh を実行します。
bin/sql-client.sh gateway --endpoint https://fqdn/sql-gateway
例
user@MININT-481C9TJ:/mnt/c/Users/user/flink-cli$ bin/sql-client.sh gateway --endpoint https://fqdn/sql-gateway ▒▓██▓██▒ ▓████▒▒█▓▒▓███▓▒ ▓███▓░░ ▒▒▒▓██▒ ▒ ░██▒ ▒▒▓▓█▓▓▒░ ▒████ ██▒ ░▒▓███▒ ▒█▒█▒ ░▓█ ███ ▓░▒██ ▓█ ▒▒▒▒▒▓██▓░▒░▓▓█ █░ █ ▒▒░ ███▓▓█ ▒█▒▒▒ ████░ ▒▓█▓ ██▒▒▒ ▓███▒ ░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░ ▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒ ███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒ ░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒ ███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░ ██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓ ▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒ ▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒ ▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█ ██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █ ▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓ █▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓ ██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓ ▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒ ██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒ ▓█ ▒█▓ ░ █░ ▒█ █▓ █▓ ██ █░ ▓▓ ▒█▓▓▓▒█░ █▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█ ██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓ ▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██ ░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓ ░▓██▒ ▓░ ▒█▓█ ░░▒▒▒ ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░ ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/user/.flink-sql-history
外部ソースを使用してテーブルにクエリを実行する前に、関連する jar を準備します。 次の例では、Flink SQL の kafka テーブル、mysql テーブルのクエリを実行します。 jar をダウンロードし、Azure Data Lake Storage gen2 ストレージに接続された Flink クラスターに配置します。
Azure ポータルでの Azure Data Lake Storage gen2 のジャー:
既に作成されているテーブルを使用し、管理のために Hive メタストアに配置してから、クエリを実行します。
手記
この例では、AKS 上の HDInsight のすべての jar が既定の Azure Data Lake Storage Gen2 です。 コンテナーとストレージ アカウントは、クラスターの作成時に指定したアカウントと同じである必要はありません。 必要に応じて、別のストレージ アカウントを指定し、クラスター ユーザーマネージド ID に Azure Data Lake Storage Gen2 側のストレージ BLOB データ所有者ロールを付与できます。
CREATE CATALOG myhive WITH ( 'type' = 'hive' ); USE CATALOG myhive; // ADD jar into environment ADD JAR 'abfs://<container>@<storage name>.dfs.core.windows.net/jar/flink-connector-jdbc-3.1.0-1.17.jar'; ADD JAR 'abfs://<container>@<storage name>.dfs.core.windows.net/jar/mysql-connector-j-8.0.33.jar'; ADD JAR 'abfs://<container>@<storage name>.dfs.core.windows.net/jar/kafka-clients-3.2.0.jar'; ADD JAR 'abfs://<container>@<storage name>.dfs.core.windows.net/jar/flink-connector-kafka-1.17.0.jar'; Flink SQL> show jars; ----------------------------------------------------------------------------------------------+ | jars | +----------------------------------------------------------------------------------------------+ | abfs://<container>@<storage name>.dfs.core.windows.net/jar/flink-connector-kafka-1.17.0.jar | | abfs://<container>@<storage name>.dfs.core.windows.net/jar/flink-connector-jdbc-3.1.0-1.17.jar | | abfs://<container>@<storage name>.dfs.core.windows.net/jar/kafka-clients-3.2.0.jar | | abfs://<container>@<storage name>.dfs.core.windows.net/jar/mysql-connector-j-8.0.33.jar | +----------------------------------------------------------------------------------------------+ 4 rows in set Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeed. Flink SQL> show tables; +----------------------+ | table name | +----------------------+ | flightsintervaldata1 | | kafka_user_orders | | kafkatable | | mysql_user_orders | | orders | +----------------------+ 5 rows in set // mysql cdc table Flink SQL> select * from mysql_user_orders; +----+-------------+----------------------------+-------------+--------------------------------+--------------+-------------+--------------+ | op | order_id | order_date | customer_id | customer_name | price | product_id | order_status | +----+-------------+----------------------------+-------------+--------------------------------+--------------+-------------+--------------+ | +I | 10001 | 2023-07-16 10:08:22.000000 | 1 | Jark | 50.00000 | 102 | FALSE | | +I | 10002 | 2023-07-16 10:11:09.000000 | 2 | Sally | 15.00000 | 105 | FALSE | | +I | 10003 | 2023-07-16 10:11:09.000000 | 3 | Sally | 25.00000 |
参考
AKS クラスター上の HDInsight における Apache Flink® インターフェース (CLI) Command-Line