게이트웨이 모드에서 SQL 클라이언트 CLI 시작
중요하다
AKS의 Azure HDInsight는 2025년 1월 31일에 사용 중지되었습니다. 에 대해 더욱 자세히 알고 싶다면 이 공지을 참고하세요.
워크로드가 갑자기 종료되는 것을 방지하기 위해 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 워크로드를 마이그레이션해야 합니다.
중요하다
이 기능은 현재 미리 보기로 제공됩니다. Microsoft Azure 미리 보기에 대한 추가 사용 약관에는 베타, 미리 보기 또는 아직 일반 공급으로 출시되지 않은 Azure 기능에 적용되는 더 많은 법적 조건이 포함되어 있습니다. 이 특정 미리 보기에 대한 자세한 내용은 AKS용 Azure HDInsight 미리 보기 정보를 참조하세요. 질문 또는 기능 제안에 대한 자세한 내용은 AskHDInsight 대한 요청을 제출하고 Azure HDInsight Community 대한 자세한 업데이트를.
이 자습서에서는 AKS의 HDInsight에서 Apache Flink 클러스터 1.17.0의 게이트웨이 모드에서 SQL 클라이언트 CLI를 시작하는 방법을 안내합니다. 게이트웨이 모드에서 CLI는 SQL을 지정된 원격 게이트웨이에 제출하여 문을 실행합니다.
./bin/sql-client.sh gateway --endpoint <gateway address>
메모
AKS의 HDInsight에 있는 Apache Flink 클러스터에서 외부 연결은 443 포트를 통해 이동합니다. 그러나 내부적으로는 포트 8083을 수신 대기하는 sql-gateway 서비스로 요청을 다시 라우팅합니다.
AKS 쪽에서 SQL 게이트웨이 서비스를 확인합니다.
Flink의 SQL 클라이언트란?
Flink의 Table & SQL API를 사용하면 SQL 언어로 작성된 쿼리를 사용할 수 있지만 이러한 쿼리는 Java 또는 Scala로 작성된 테이블 프로그램에 포함해야 합니다. 또한 이러한 프로그램은 클러스터에 제출되기 전에 빌드 도구를 사용하여 패키지해야 합니다. 이 기능은 Flink를 Java/Scala 프로그래머로 제한합니다.
SQL 클라이언트는 한 줄의 Java 또는 Scala 코드 없이 Flink 클러스터에 테이블 프로그램을 쉽게 작성, 디버깅 및 제출하는 방법을 제공하는 것을 목표로 합니다. SQL 클라이언트 CLI를 사용하면 명령줄에서 실행 중인 분산 애플리케이션에서 실시간 결과를 검색하고 시각화할 수 있습니다.
자세한 내용은 webssh Flink SQL CLI 클라이언트를 입력하는 방법을참조하세요.
Flink의 SQL Gateway란?
SQL Gateway는 원격의 여러 클라이언트가 동시성으로 SQL을 실행할 수 있도록 하는 서비스입니다. Flink 작업을 제출하고, 메타데이터를 조회하고, 온라인으로 데이터를 분석하는 쉬운 방법을 제공합니다.
자세한 내용은 SQL Gateway참조하세요.
Flink-cli에서 게이트웨이 모드에서 SQL 클라이언트 CLI 시작
AKS의 HDInsight에 있는 Apache Flink 클러스터에서 다음 명령을 실행하여 게이트웨이 모드에서 SQL 클라이언트 CLI를 시작합니다.
./bin/sql-client.sh gateway --endpoint host:port
or
./bin/sql-client.sh gateway --endpoint https://fqdn/sql-gateway
Azure Portal에서 클러스터 엔드포인트(호스트 또는 fqdn)를 가져옵니다.
테스트
준비
Flink CLI 다운로드
- 로컬 Windows 컴퓨터의 https://aka.ms/hdionaksflink117clilinux Flink CLI를 다운로드합니다.
Linux용 Windows 하위 시스템을 설치하여 로컬 Windows 컴퓨터에서 이 작업을 수행합니다.
Windows 명령을 열고(JAVA_HOME 및 flink-cli 경로를 사용자 고유의 경로로 바꾸기) flink-cli를 다운로드합니다.
Windows Subsystem for Linux --distribution Ubuntu export JAVA_HOME=/mnt/c/Work/99_tools/zulu11.56.19-ca-jdk11.0.15-linux_x64 cd <folder> wget https://hdiconfigactions.blob.core.windows.net/hiloflink17blob/flink-cli.tgz tar -xvf flink-cli.tgz
flink-conf.yaml에서 엔드포인트, 테넌트 ID 및 포트 443 설정
user@MININT-481C9TJ:/mnt/c/Users/user/flink-cli$ cd conf user@MININT-481C9TJ:/mnt/c/Users/user/flink-cli/conf$ ls -l total 8 -rwxrwxrwx 1 user user 2451 Feb 26 20:33 flink-conf.yaml -rwxrwxrwx 1 user user 2946 Feb 23 14:13 log4j-cli.properties user@MININT-481C9TJ:/mnt/c/Users/user/flink-cli/conf$ cat flink-conf.yaml rest.address: <flink cluster endpoint on Azure portal> azure.tenant.id: <tenant ID> rest.port: 443
AKS 클러스터 서브넷의 네트워크 보안 인바운드에서 VPN 활성화 상태로 포트 443을 이용하는 로컬 Windows 공용 IP를 HDInsight에 허용 목록에 추가합니다.
Flink-cli에서 게이트웨이 모드에서 Flink SQL로 sql-client.sh 실행합니다.
bin/sql-client.sh gateway --endpoint https://fqdn/sql-gateway
예
user@MININT-481C9TJ:/mnt/c/Users/user/flink-cli$ bin/sql-client.sh gateway --endpoint https://fqdn/sql-gateway ▒▓██▓██▒ ▓████▒▒█▓▒▓███▓▒ ▓███▓░░ ▒▒▒▓██▒ ▒ ░██▒ ▒▒▓▓█▓▓▒░ ▒████ ██▒ ░▒▓███▒ ▒█▒█▒ ░▓█ ███ ▓░▒██ ▓█ ▒▒▒▒▒▓██▓░▒░▓▓█ █░ █ ▒▒░ ███▓▓█ ▒█▒▒▒ ████░ ▒▓█▓ ██▒▒▒ ▓███▒ ░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░ ▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒ ███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒ ░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒ ███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░ ██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓ ▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒ ▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒ ▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█ ██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █ ▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓ █▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓ ██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓ ▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒ ██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒ ▓█ ▒█▓ ░ █░ ▒█ █▓ █▓ ██ █░ ▓▓ ▒█▓▓▓▒█░ █▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█ ██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓ ▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██ ░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓ ░▓██▒ ▓░ ▒█▓█ ░░▒▒▒ ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░ ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/user/.flink-sql-history
외부 원본을 사용하여 테이블을 쿼리하기 전에 관련 jar를 준비합니다. 다음은 Flink SQL의 mysql 테이블인 kafka 테이블을 쿼리하는 예제입니다. jar을 다운로드하여 Flink 클러스터에 연결한 Azure Data Lake Storage gen2 스토리지에 배치합니다.
Azure Portal의 Azure Data Lake Storage gen2에 있는 Jars:
이미 만든 테이블을 사용하여 관리를 위해 Hive 메타스토어에 넣은 다음 쿼리를 실행합니다.
메모
이 예제에서는 AKS의 기본 Azure Data Lake Storage Gen2에 있는 HDInsight의 모든 jar 파일이 사용됩니다. 컨테이너 및 스토리지 계정은 클러스터를 만드는 동안 지정한 것과 동일하지 않아도 됩니다. 필요한 경우 다른 스토리지 계정을 지정하고 클러스터 사용자 관리 ID에 Azure Data Lake Storage Gen2 쪽에서 스토리지 Blob 데이터 소유자 역할을 부여할 수 있습니다.
CREATE CATALOG myhive WITH ( 'type' = 'hive' ); USE CATALOG myhive; // ADD jar into environment ADD JAR 'abfs://<container>@<storage name>.dfs.core.windows.net/jar/flink-connector-jdbc-3.1.0-1.17.jar'; ADD JAR 'abfs://<container>@<storage name>.dfs.core.windows.net/jar/mysql-connector-j-8.0.33.jar'; ADD JAR 'abfs://<container>@<storage name>.dfs.core.windows.net/jar/kafka-clients-3.2.0.jar'; ADD JAR 'abfs://<container>@<storage name>.dfs.core.windows.net/jar/flink-connector-kafka-1.17.0.jar'; Flink SQL> show jars; ----------------------------------------------------------------------------------------------+ | jars | +----------------------------------------------------------------------------------------------+ | abfs://<container>@<storage name>.dfs.core.windows.net/jar/flink-connector-kafka-1.17.0.jar | | abfs://<container>@<storage name>.dfs.core.windows.net/jar/flink-connector-jdbc-3.1.0-1.17.jar | | abfs://<container>@<storage name>.dfs.core.windows.net/jar/kafka-clients-3.2.0.jar | | abfs://<container>@<storage name>.dfs.core.windows.net/jar/mysql-connector-j-8.0.33.jar | +----------------------------------------------------------------------------------------------+ 4 rows in set Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; [INFO] Execute statement succeed. Flink SQL> show tables; +----------------------+ | table name | +----------------------+ | flightsintervaldata1 | | kafka_user_orders | | kafkatable | | mysql_user_orders | | orders | +----------------------+ 5 rows in set // mysql cdc table Flink SQL> select * from mysql_user_orders; +----+-------------+----------------------------+-------------+--------------------------------+--------------+-------------+--------------+ | op | order_id | order_date | customer_id | customer_name | price | product_id | order_status | +----+-------------+----------------------------+-------------+--------------------------------+--------------+-------------+--------------+ | +I | 10001 | 2023-07-16 10:08:22.000000 | 1 | Jark | 50.00000 | 102 | FALSE | | +I | 10002 | 2023-07-16 10:11:09.000000 | 2 | Sally | 15.00000 | 105 | FALSE | | +I | 10003 | 2023-07-16 10:11:09.000000 | 3 | Sally | 25.00000 |