AKS에서 HDInsight를 통해 운영되는 Apache Flink® 클러스터의 Hive 방언
중요하다
AKS의 Azure HDInsight는 2025년 1월 31일에 사용 중지되었습니다. 이 발표 을 통해에 대해 자세히 알아보세요.
워크로드가 갑자기 종료되는 것을 방지하기 위해 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 워크로드를 마이그레이션해야 합니다.
중요하다
이 기능은 현재 미리 보기로 제공됩니다. Microsoft Azure 프리뷰에 대한 추가 사용 약관에는 베타, 프리뷰 또는 일반 공급으로 아직 릴리스되지 않은 Azure 기능에 적용되는 더 많은 법적 조건이 포함되어 있습니다. 이 특정 미리 보기에 대한 자세한 내용은 Azure HDInsight on AKS 미리 보기 정보을 참조하세요. 질문이나 기능 제안을 하시려면, 세부 사항을 포함하여 AskHDInsight에 요청을 제출하십시오. 보다 많은 업데이트를 확인하려면 Azure HDInsight Community를 팔로우하세요.
이 문서에서는 AKS의 HDInsight에 있는 Apache Flink 클러스터에서 Hive 방언을 사용하는 방법을 알아봅니다.
소개
사용자는 AKS 클러스터의 HDInsight에서 사용하기 위해 기본 flink
방언을 hive 언어로 변경할 수 없습니다. 모든 SQL 작업은 다음 오류와 함께 hive 언어로 변경되면 실패합니다.
*java.lang.ClassCastException: class jdk.internal.loader.ClassLoaders$AppClassLoader can't be cast to class java.net.URLClassLoader*
이 문제의 원인은 이 열린 Hive Jira때문입니다. 현재 Hive는 시스템 클래스 로더가 URLClassLoader의 인스턴스라고 가정합니다.
Java 11
에서는 이 가정이 적용되지 않습니다.
Flink에서 Hive 방언을 사용하는 방법
webssh에서 다음 단계를 실행하십시오:
- lib 위치에서 기존 flink-sql-connector-hive*jar 제거
rm /opt/flink-webssh/lib/flink-sql-connector-hive*jar
-
webssh
Pod에서 다음 jar를 다운로드하고 /opt/flink-webssh/lib wget https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader/1.17.0아래에 추가합니다. (위에 있는 hive jar에는 https://issues.apache.org/jira/browse/HIVE-27508수정이 적용되었습니다.)
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/ mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
- core-site.xml 섹션의
flink
구성 관리에 다음 키를 추가합니다.fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY> flink.hadoop.fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
- lib 위치에서 기존 flink-sql-connector-hive*jar 제거
-
- 분할하지 않고 Flink에서 Hive 방언 실행
root [ ~ ]# ./bin/sql-client.sh Flink SQL> Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/opt/hive-conf'); [INFO] Execute statement succeed. Flink SQL> use catalog myhive; [INFO] Execute statement succeed. Flink SQL> load module hive; [INFO] Execute statement succeed. Flink SQL> use modules hive,core; [INFO] Execute statement succeed. Flink SQL> set table.sql-dialect=hive; [INFO] Session property has been set. Flink SQL> set sql-client.execution.result-mode=tableau; [INFO] Session property has been set. Flink SQL> select explode(array(1,2,3));Hive Session ID = 6ba45be2-360e-4bee-8842-2765c91581c8 > [!WARNING] > An illegal reflective access operation has occurred > [!WARNING] > Illegal reflective access by org.apache.hadoop.hive.common.StringInternUtils (file:/opt/flink-webssh/lib/flink-sql-connector-hive-3.1.2_2.12-1.16-SNAPSHOT.jar) to field java.net.URI.string > [!WARNING] > Please consider reporting this to the maintainers of org.apache.hadoop.hive.common.StringInternUtils > [!WARNING] > `Use --illegal-access=warn` to enable warnings of further illegal reflective access operations > [!WARNING] > All illegal access operations will be denied in a future release select explode(array(1,2,3)); +----+-------------+ | op | col | +----+-------------+ | +I | 1 | | +I | 2 | | +I | 3 | +----+-------------+ Received a total of 3 rows Flink SQL> create table tttestHive Session ID = fb8b652a-8dad-4781-8384-0694dc16e837 [INFO] Execute statement succeed. Flink SQL> insert into table tttestHive Session ID = f239dc6f-4b58-49f9-ad02-4c73673737d8),(3,'c'),(4,'d'); [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: d0542da4c4252f9494298666ff4e9f8e Flink SQL> set execution.runtime-mode=batch; [INFO] Session property has been set. Flink SQL> select * from tttestHive Session ID = 61b6eb3b-90a6-499c-aced-0598366c5b31 +-----+-------+ | key | value | +-----+-------+ | 1 | a | | 1 | a | | 2 | b | | 3 | c | | 3 | c | | 3 | c | | 4 | d | | 5 | e | +-----+-------+ 8 rows in set Flink SQL> QUIT;Hive Session ID = 2dadad92-436e-426e-a88c-66eafd740d98 [INFO] Exiting Flink SQL CLI Client... Shutting down the session... done. root [ ~ ]# exit
데이터는 hive/warehouse 디렉터리에 구성된 동일한 컨테이너에 기록됩니다.
- 파티션을 사용하여 Flink에서 Hive 방언 실행
create table tblpart2 (key int, value string) PARTITIONED by ( part string ) tblproperties ('sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
insert into table tblpart2 Hive Session ID = 78fae85f-a451-4110-bea6-4aa1c172e282),(2,'b','d'),(3,'c','d'),(3,'c','a'),(4,'d','e');
참조
- Apache Flink에서의 Hive 언어
- Apache, Apache Flink, Flink 및 관련 오픈 소스 프로젝트 이름은 asF(Apache Software Foundation)의 상표입니다.