次の方法で共有


AKS 上の HDInsight クラスターの Apache Flink® における Hive 方言

大事な

AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 この発表 についてさらに知ることができます。

ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。

大事な

この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能の提案がある場合は、AskHDInsight に詳細を記載してリクエストを送信してください。また、Azure HDInsight Community をフォローして、詳細な更新情報を受け取ってください。

この記事では、AKS 上の HDInsight 上の Apache Flink クラスターで Hive 言語を使用する方法について説明します。

紹介

ユーザーは、AKS クラスター上の HDInsight での使用のために、既定の flink 方言を Hive 言語に変更することはできません。 すべての SQL 操作は、次のエラーで Hive 言語に変更されると失敗します。


*java.lang.ClassCastException: class jdk.internal.loader.ClassLoaders$AppClassLoader can't be cast to class java.net.URLClassLoader*

この問題は、Hive Jiraが開いているために起こります。 現在、Hive では、システム クラス ローダーが URLClassLoader のインスタンスであると想定しています。 Java 11では、この前提は当てはまるわけではありません。

  • webssh で次の手順実行します。

    1. lib の場所にある既存の flink-sql-connector-hive*jar を削除する
      rm /opt/flink-webssh/lib/flink-sql-connector-hive*jar
      
    2. 次の jar を webssh ポッドにダウンロードし、/opt/flink-webssh/lib wget https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader/1.17.0の下に追加します。 (上記の hive jar には修正 https://issues.apache.org/jira/browse/HIVE-27508があります)
    mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
    mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
    
    1. flink 構成管理の [core-site.xml] セクションに、次のキーを追加します。
      fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
      flink.hadoop.fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
      
  • hive-dialect クエリ の概要を次に示します。

    • パーティション分割なしで Flink で Hive 言語を実行する
      root [ ~ ]# ./bin/sql-client.sh
      Flink SQL>
      Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/opt/hive-conf');
      [INFO] Execute statement succeed.
    
      Flink SQL> use catalog myhive;
      [INFO] Execute statement succeed.
    
      Flink SQL> load module hive;
      [INFO] Execute statement succeed.
    
      Flink SQL> use modules hive,core;
      [INFO] Execute statement succeed.
    
      Flink SQL> set table.sql-dialect=hive;
      [INFO] Session property has been set.
    
      Flink SQL> set sql-client.execution.result-mode=tableau;
      [INFO] Session property has been set.
    
      Flink SQL> select explode(array(1,2,3));Hive Session ID = 6ba45be2-360e-4bee-8842-2765c91581c8
    
    
    > [!WARNING]
    > An illegal reflective access operation has occurred
    
    > [!WARNING]
    > Illegal reflective access by org.apache.hadoop.hive.common.StringInternUtils (file:/opt/flink-webssh/lib/flink-sql-connector-hive-3.1.2_2.12-1.16-SNAPSHOT.jar) to field java.net.URI.string
    
    > [!WARNING]
    > Please consider reporting this to the maintainers of org.apache.hadoop.hive.common.StringInternUtils
    
    > [!WARNING]
    > `Use --illegal-access=warn` to enable warnings of further illegal reflective access operations
    
    > [!WARNING]
    >  All illegal access operations will be denied in a future release
    select explode(array(1,2,3));
    
    
    +----+-------------+
    | op |         col |
    +----+-------------+
    | +I |           1 |
    | +I |           2 |
    | +I |           3 |
    +----+-------------+
    
    Received a total of 3 rows
    
    Flink SQL> create table tttestHive Session ID = fb8b652a-8dad-4781-8384-0694dc16e837
    
    [INFO] Execute statement succeed.
    
    Flink SQL> insert into table tttestHive Session ID = f239dc6f-4b58-49f9-ad02-4c73673737d8),(3,'c'),(4,'d');
    
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: d0542da4c4252f9494298666ff4e9f8e
    
    Flink SQL> set execution.runtime-mode=batch;
    [INFO] Session property has been set.
    
    Flink SQL> select * from tttestHive Session ID = 61b6eb3b-90a6-499c-aced-0598366c5b31
    
    +-----+-------+
    | key | value |
    +-----+-------+
    |   1 |     a |
    |   1 |     a |
    |   2 |     b |
    |   3 |     c |
    |   3 |     c |
    |   3 |     c |
    |   4 |     d |
    |   5 |     e |
    +-----+-------+
    8 rows in set
    
    Flink SQL> QUIT;Hive Session ID = 2dadad92-436e-426e-a88c-66eafd740d98
    
    [INFO] Exiting Flink SQL CLI Client...
    
    Shutting down the session...
    done.
    root [ ~ ]# exit
    

    データは、hive/warehouse ディレクトリで構成されたのと同じコンテナーに書き込まれます。

    スクリーンショットは、コンテナー テーブル 1 を示しています。

    • パーティションを使用した Flink での Hive 言語の実行
  create table tblpart2 (key int, value string) PARTITIONED by ( part string ) tblproperties ('sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');

  insert into table tblpart2 Hive Session ID = 78fae85f-a451-4110-bea6-4aa1c172e282),(2,'b','d'),(3,'c','d'),(3,'c','a'),(4,'d','e');

スクリーンショットは、コンテナー テーブル 2 を示しています。

スクリーンショットは、コンテナー テーブル 3 を示しています。

参考

  • Apache Flink における Hive ダイアレクト
  • Apache、Apache Flink、Flink、および関連するオープン ソース プロジェクト名は、Apache Software Foundation (ASF) の 商標です。