Apache Flink® DataStream API で Hive メタストアを使用する
大事な
AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 このお知らせ で、についてさらに詳しく知ってください。
ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。
大事な
この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能提案がありましたら、AskHDInsight に詳細を記載したリクエストを送信し、Azure HDInsight Communityをフォローしてさらなる更新情報をお受け取りください。
長年にわたり、Hive Metastore は Hadoop エコシステムの事実上のメタデータ センターに進化してきました。 多くの企業は、すべてのメタデータ (Hive または Hive 以外のメタデータ) を管理するために、運用環境に個別の Hive Metastore サービス インスタンスを持っています。 Hive と Flink の両方のデプロイを持つユーザーの場合、HiveCatalog を使用して Hive Metastore を使用して Flink のメタデータを管理できます。
AKS 上の HDInsight 上の Apache Flink クラスターでサポートされている Hive バージョン
サポートされている Hive バージョン:
- 3.1
- 3.1.0
- 3.1.1
- 3.1.2
- 3.1.3
独自のプログラムをビルドする場合は、mvn ファイルに次の依存関係が必要です。 これらの依存関係 結果の jar ファイルに含める は推奨されません。 実行時に依存関係を追加する必要があります。
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Hive に接続する
この例では、Hive に接続するさまざまなスニペットを示しています。AKS 上の HDInsight で Apache Flink を使用する場合は、hive 構成ディレクトリとして /opt/hive-conf
を使用して Hive メタストアに接続する必要があります
package contoso.example;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class hiveDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start Table Environment
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
String catalogName = "myhive";
String defaultDatabase = HiveCatalog.DEFAULT_DB;
String hiveConfDir = "/opt/hive-conf";
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
// register HiveCatalog in the tableEnv
tableEnv.registerCatalog("myhive", hive);
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
// Create a table in hive catalog
tableEnv.executeSql("create table MyTable (name varchar(32), age int) with ('connector' = 'filesystem', 'path' = 'abfs://flink@contosogen2.dfs.core.windows.net/data/', 'format' = 'csv','csv.field-delimiter' = ',')");
// Create a view in hive catalog
tableEnv.executeSql("create view MyView as select * from MyTable");
// Read from the table and print the results
tableEnv.from("MyTable").execute().print();
// 4. run stream
env.execute("Hive Demo on Flink");
}
}
Webssh ポッドで、Planner jar を移動します
webssh ポッドの /opt to /lib
にある jar flink-table-planner-loader-1.17.0-*.*.*.jar
を移動し、lib
から jar flink-table-planner-loader-1.17.0-*.*.*.jar
を取り出します。 詳細については、問題を参照してください。 Planner jar を移動するには、次の手順を実行します。
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-1.1.8.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-1.1.8.jar /opt/flink-webssh/lib/
手記
追加の Planner jar の移動は、Hive 言語または HiveServer2 エンドポイントを使用する場合にのみ必要です。 ただし、これは Hive 統合に推奨されるセットアップです。
詳細については、「AKS 上の HDInsight 上の Apache Flink® で Hive カタログを使用する方法」を参照してください。
jar をパッケージ化して Webssh にアップロードして実行する
user@sshnode-0 [ ~ ]$ bin/flink run -c contoso.example.hiveDemo -j FlinkSQLServerCDCDemo-1.0-SNAPSHOT.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID 5c887e1f8e1bfac501168c439a83788f
+----+--------------------------------+-------------+
| op | name | age |
+----+--------------------------------+-------------+
| +I | Jack | 18 |
| +I | mike | 24 |
+----+--------------------------------+-------------+
2 rows in set
Flink UI で実行されている JOB を確認する
sql-client.sh
を使用して Webssh UI のテーブルを確認する
user@sshnode-0 [ ~ ]$ bin/sql-client.sh
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
????????
????????????????
??????? ??????? ?
???? ????????? ?????
??? ??????? ?????
??? ??? ?????
?? ???????????????
?? ? ??? ?????? ?????
????? ???? ????? ?????
??????? ??? ??????? ???
????????? ?? ?? ??????????
???????? ?? ? ?? ???????
???? ??? ? ?? ???????? ?????
???? ? ?? ? ?? ???????? ???? ??
???? ???? ?????????? ??? ?? ????
???? ?? ??? ??????????? ???? ? ? ???
??? ?? ??? ????????? ???? ???
?? ? ??????? ???????? ??? ??
??? ??? ???????????????????? ???? ?
????? ??? ?????? ???????? ???? ??
???????? ??????????????? ??
?? ???? ??????? ??? ?????? ?? ???
??? ??? ??? ??????? ???? ?????????????
??? ????? ???? ?? ?? ???? ???
?? ??? ? ?? ?? ??
?? ?? ?? ?? ????????
?? ????? ?? ??????????? ??
?? ???? ? ??????? ??
??? ????? ?? ???????????
???? ???? ??????? ????????
????? ?? ???? ?????
????????????????????????????????? ?????
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Command history file path: /home/xcao/.flink-sql-history
Flink SQL> CREATE CATALOG myhive WITH (
> 'type' = 'hive'
> );
[INFO] Execute statement succeed.
Flink SQL> USE CATALOG myhive;
[INFO] Execute statement succeed.
Flink SQL> show tables
> ;
+------------+
| table name |
+------------+
| mytable |
| myview |
+------------+
2 rows in set
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeed.
Flink SQL> select * from mytable;
+----+--------------------------------+-------------+
| op | name | age |
+----+--------------------------------+-------------+
| +I | Jack | 18 |
| +I | mike | 24 |
+----+--------------------------------+-------------+
Received a total of 2 rows
参照
- Hive 読み取り & 書き込み の
- Apache、Apache Hive、Hive、Apache Flink、Flink、および関連するオープン ソース プロジェクト名は、Apache Software Foundation (ASF) の 商標です。