Použití metastoru Hive s rozhraním Apache Flink® DataStream API
Důležitý
Azure HDInsight v AKS byl vyřazen 31. ledna 2025. Další informace s tímto oznámením.
Abyste se vyhnuli náhlému ukončení úloh, musíte migrovat úlohy do Microsoft Fabric nebo ekvivalentního produktu Azure.
Důležitý
Tato funkce je aktuálně ve verzi Preview. Doplňkové podmínky použití pro předběžné verze Microsoft Azure obsahují další právní podmínky, které se vztahují na funkce Azure, které jsou v beta verzi, ve fázi Preview nebo ještě nebyly vydány ve veřejné dostupnosti. Informace o této konkrétní preview verzi naleznete ve Azure HDInsight na AKS preview informacích. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost na AskHDInsight s podrobnostmi a sledujte nás pro další aktualizace v komunitě Azure HDInsight .
V průběhu let se Hive Metastore stal de facto centrem metadat v ekosystému Hadoop. Mnoho společností má ve svých produkčních prostředích samostatnou instanci služby Metastore Hive, která spravují všechna metadata (Hive nebo jiná metadata než Hive). Pro uživatele, kteří mají nasazení Hive i Flink, jim HiveCatalog umožňuje spravovat metadata Flinku pomocí metastoru Hive.
Podporované verze Hivu pro clustery Apache Flink ve službě HDInsight v AKS
Podporovaná verze Hivu:
- 3.1
- 3.1.0
- 3.1.1
- 3.1.2
- 3.1.3
Pokud vytváříte vlastní program, potřebujete v souboru mvn následující závislosti. Ve výsledném souboru JAR se nedoporučuje zahrnout tyto závislosti. Měli byste přidat závislosti za běhu.
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Připojení k Hivu
Tento příklad ukazuje různé fragmenty kódu připojení k hivu pomocí Apache Flinku ve službě HDInsight v AKS, musíte použít /opt/hive-conf
jako konfigurační adresář Hive pro připojení k metastoru Hive.
package contoso.example;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class hiveDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start Table Environment
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
String catalogName = "myhive";
String defaultDatabase = HiveCatalog.DEFAULT_DB;
String hiveConfDir = "/opt/hive-conf";
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
// register HiveCatalog in the tableEnv
tableEnv.registerCatalog("myhive", hive);
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
// Create a table in hive catalog
tableEnv.executeSql("create table MyTable (name varchar(32), age int) with ('connector' = 'filesystem', 'path' = 'abfs://flink@contosogen2.dfs.core.windows.net/data/', 'format' = 'csv','csv.field-delimiter' = ',')");
// Create a view in hive catalog
tableEnv.executeSql("create view MyView as select * from MyTable");
// Read from the table and print the results
tableEnv.from("MyTable").execute().print();
// 4. run stream
env.execute("Hive Demo on Flink");
}
}
Na podu Webssh přesuňte planner jar.
Přesuňte soubor JAR flink-table-planner-loader-1.17.0-*.*.*.jar
umístěný v /opt to /lib
podu webssh a přesuňte flink-table-planner-loader-1.17.0-*.*.*.jar
jar z lib
. Další podrobnosti najdete v tématu o problému. Pomocí následujících kroků přesuňte soubor JAR plánovače.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-1.1.8.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-1.1.8.jar /opt/flink-webssh/lib/
Poznámka
Přesunutí dalšího plánovacího souboru JAR je potřeba jen v případě, že používáte dialekt Hive nebo koncový bod HiveServer2. Toto je ale doporučené nastavení pro integraci Hive.
Další informace najdete v tématu Použití katalogu Hive s Apache Flink® ve službě HDInsight ve službě AKS
Zabalte soubor JAR a nahrajte ho do webssh a spusťte
user@sshnode-0 [ ~ ]$ bin/flink run -c contoso.example.hiveDemo -j FlinkSQLServerCDCDemo-1.0-SNAPSHOT.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID 5c887e1f8e1bfac501168c439a83788f
+----+--------------------------------+-------------+
| op | name | age |
+----+--------------------------------+-------------+
| +I | Jack | 18 |
| +I | mike | 24 |
+----+--------------------------------+-------------+
2 rows in set
Kontrola úlohy spuštěné v uživatelském rozhraní Flink
Kontrola tabulky v uživatelském rozhraní Webssh prostřednictvím sql-client.sh
user@sshnode-0 [ ~ ]$ bin/sql-client.sh
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
????????
????????????????
??????? ??????? ?
???? ????????? ?????
??? ??????? ?????
??? ??? ?????
?? ???????????????
?? ? ??? ?????? ?????
????? ???? ????? ?????
??????? ??? ??????? ???
????????? ?? ?? ??????????
???????? ?? ? ?? ???????
???? ??? ? ?? ???????? ?????
???? ? ?? ? ?? ???????? ???? ??
???? ???? ?????????? ??? ?? ????
???? ?? ??? ??????????? ???? ? ? ???
??? ?? ??? ????????? ???? ???
?? ? ??????? ???????? ??? ??
??? ??? ???????????????????? ???? ?
????? ??? ?????? ???????? ???? ??
???????? ??????????????? ??
?? ???? ??????? ??? ?????? ?? ???
??? ??? ??? ??????? ???? ?????????????
??? ????? ???? ?? ?? ???? ???
?? ??? ? ?? ?? ??
?? ?? ?? ?? ????????
?? ????? ?? ??????????? ??
?? ???? ? ??????? ??
??? ????? ?? ???????????
???? ???? ??????? ????????
????? ?? ???? ?????
????????????????????????????????? ?????
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Command history file path: /home/xcao/.flink-sql-history
Flink SQL> CREATE CATALOG myhive WITH (
> 'type' = 'hive'
> );
[INFO] Execute statement succeed.
Flink SQL> USE CATALOG myhive;
[INFO] Execute statement succeed.
Flink SQL> show tables
> ;
+------------+
| table name |
+------------+
| mytable |
| myview |
+------------+
2 rows in set
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeed.
Flink SQL> select * from mytable;
+----+--------------------------------+-------------+
| op | name | age |
+----+--------------------------------+-------------+
| +I | Jack | 18 |
| +I | mike | 24 |
+----+--------------------------------+-------------+
Received a total of 2 rows
Odkazy
- číst & zapsat Hive
- Názvy Apache, Apache Hive, Hive, Apache Flink, Flink a přidružených open source projektů jsou ochranné známky Apache Software Foundation (ASF).