Używanie magazynu metadanych Hive z API Apache Flink® DataStream
Ważny
Usługa Azure HDInsight w usłudze AKS została wycofana 31 stycznia 2025 r. Dowiedz się więcej z tego ogłoszenia .
Aby uniknąć nagłego kończenia obciążeń, należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure.
Ważny
Ta funkcja jest obecnie dostępna w wersji zapoznawczej. Dodatkowe warunki użytkowania dla wersji zapoznawczych platformy Microsoft Azure zawierają więcej warunków prawnych, które mają zastosowanie do funkcji Azure będących w wersji beta, zapoznawczej lub jeszcze nieudostępnionych ogółowi. Aby uzyskać informacje na temat tej konkretnej wersji zapoznawczej, zobacz Azure HDInsight w usłudze AKS w wersji zapoznawczej informacji. W przypadku pytań lub sugestii dotyczących funkcji, prosimy o przesłanie zgłoszenia na AskHDInsight z szczegółami i śledzenie nas, aby otrzymywać więcej aktualizacji na Azure HDInsight Community.
W ciągu lat magazyn metadanych Hive ewoluował w de facto centrum metadanych w ekosystemie hadoop. Wiele firm ma oddzielne wystąpienie usługi Metastore Hive w swoich środowiskach produkcyjnych, aby zarządzać wszystkimi metadanymi (metadanymi Hive lub nie-Hive). W przypadku użytkowników, którzy mają wdrożenia programu Hive i Flink, program HiveCatalog umożliwia im używanie magazynu metadanych Hive do zarządzania metadanymi Flink.
Obsługiwane wersje programu Hive dla klastrów Apache Flink w usłudze HDInsight na platformie AKS
Obsługiwana wersja programu Hive:
- 3.1
- 3.1.0
- 3.1.1
- 3.1.2
- 3.1.3
Jeśli tworzysz własny program, potrzebujesz następujących zależności w pliku mvn. Nie zaleca się dołączania tych zależności do wynikowego pliku jar. Należy dodać zależności w czasie wykonywania.
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Nawiązywanie połączenia z usługą Hive
W tym przykładzie przedstawiono różne fragmenty kodu dotyczące nawiązywania połączenia z usługą Hive przy użyciu Apache Flink na platformie HDInsight w środowisku AKS. Aby połączyć się z magazynem metadanych Hive, musisz użyć /opt/hive-conf
jako katalogu konfiguracji Hive.
package contoso.example;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class hiveDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start Table Environment
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
String catalogName = "myhive";
String defaultDatabase = HiveCatalog.DEFAULT_DB;
String hiveConfDir = "/opt/hive-conf";
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
// register HiveCatalog in the tableEnv
tableEnv.registerCatalog("myhive", hive);
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
// Create a table in hive catalog
tableEnv.executeSql("create table MyTable (name varchar(32), age int) with ('connector' = 'filesystem', 'path' = 'abfs://flink@contosogen2.dfs.core.windows.net/data/', 'format' = 'csv','csv.field-delimiter' = ',')");
// Create a view in hive catalog
tableEnv.executeSql("create view MyView as select * from MyTable");
// Read from the table and print the results
tableEnv.from("MyTable").execute().print();
// 4. run stream
env.execute("Hive Demo on Flink");
}
}
W podzie Webssh przenieś jar planner
Przenieś słoik flink-table-planner-loader-1.17.0-*.*.*.jar
znajdujący się w zasobniku webssh /opt to /lib
i przenieś słoik flink-table-planner-loader-1.17.0-*.*.*.jar
z lib
. Aby uzyskać więcej informacji, zobacz sprawę. Aby przenieść plik jar narzędzia do planowania, wykonaj następujące kroki.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-1.1.8.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-1.1.8.jar /opt/flink-webssh/lib/
Notatka
Dodatkowy plik jar dla planera jest potrzebny tylko podczas korzystania z dialektu Hive lub punktu końcowego HiveServer2. Jest to jednak zalecana konfiguracja integracji programu Hive.
Aby uzyskać więcej informacji, zobacz How to use Hive Catalog with Apache Flink on HDInsight on AKS (Jak używać katalogu Hive z usługą Apache Flink® w usłudze HDInsight w usłudze AKS)
Spakuj plik jar, prześlij go do WebSSH, a następnie uruchom
user@sshnode-0 [ ~ ]$ bin/flink run -c contoso.example.hiveDemo -j FlinkSQLServerCDCDemo-1.0-SNAPSHOT.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID 5c887e1f8e1bfac501168c439a83788f
+----+--------------------------------+-------------+
| op | name | age |
+----+--------------------------------+-------------+
| +I | Jack | 18 |
| +I | mike | 24 |
+----+--------------------------------+-------------+
2 rows in set
Sprawdź zadanie uruchomione w interfejsie użytkownika Flink
Sprawdzanie tabeli w interfejsie użytkownika protokołu Webssh za pośrednictwem sql-client.sh
user@sshnode-0 [ ~ ]$ bin/sql-client.sh
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
????????
????????????????
??????? ??????? ?
???? ????????? ?????
??? ??????? ?????
??? ??? ?????
?? ???????????????
?? ? ??? ?????? ?????
????? ???? ????? ?????
??????? ??? ??????? ???
????????? ?? ?? ??????????
???????? ?? ? ?? ???????
???? ??? ? ?? ???????? ?????
???? ? ?? ? ?? ???????? ???? ??
???? ???? ?????????? ??? ?? ????
???? ?? ??? ??????????? ???? ? ? ???
??? ?? ??? ????????? ???? ???
?? ? ??????? ???????? ??? ??
??? ??? ???????????????????? ???? ?
????? ??? ?????? ???????? ???? ??
???????? ??????????????? ??
?? ???? ??????? ??? ?????? ?? ???
??? ??? ??? ??????? ???? ?????????????
??? ????? ???? ?? ?? ???? ???
?? ??? ? ?? ?? ??
?? ?? ?? ?? ????????
?? ????? ?? ??????????? ??
?? ???? ? ??????? ??
??? ????? ?? ???????????
???? ???? ??????? ????????
????? ?? ???? ?????
????????????????????????????????? ?????
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Command history file path: /home/xcao/.flink-sql-history
Flink SQL> CREATE CATALOG myhive WITH (
> 'type' = 'hive'
> );
[INFO] Execute statement succeed.
Flink SQL> USE CATALOG myhive;
[INFO] Execute statement succeed.
Flink SQL> show tables
> ;
+------------+
| table name |
+------------+
| mytable |
| myview |
+------------+
2 rows in set
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeed.
Flink SQL> select * from mytable;
+----+--------------------------------+-------------+
| op | name | age |
+----+--------------------------------+-------------+
| +I | Jack | 18 |
| +I | mike | 24 |
+----+--------------------------------+-------------+
Received a total of 2 rows
Bibliografia
- hive odczyt & zapisu
- Nazwy projektów Apache, Apache Hive, Hive, Apache Flink, Flink oraz powiązane z nimi nazwy projektów open source są znakami towarowymiApache Software Foundation (ASF).