Udostępnij za pośrednictwem


Używanie magazynu metadanych Hive z interfejsem API apache Flink® DataStream

Uwaga

Wycofamy usługę Azure HDInsight w usłudze AKS 31 stycznia 2025 r. Przed 31 stycznia 2025 r. należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure, aby uniknąć nagłego zakończenia obciążeń. Pozostałe klastry w ramach subskrypcji zostaną zatrzymane i usunięte z hosta.

Tylko podstawowa pomoc techniczna będzie dostępna do daty wycofania.

Ważne

Ta funkcja jest aktualnie dostępna jako funkcja podglądu. Dodatkowe warunki użytkowania dla wersji zapoznawczych platformy Microsoft Azure obejmują więcej warunków prawnych, które dotyczą funkcji platformy Azure, które znajdują się w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej, zobacz Informacje o wersji zapoznawczej usługi Azure HDInsight w usłudze AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie w usłudze AskHDInsight , aby uzyskać szczegółowe informacje i postępuj zgodnie z nami, aby uzyskać więcej aktualizacji w społeczności usługi Azure HDInsight.

W ciągu lat magazyn metadanych Hive ewoluował w de facto centrum metadanych w ekosystemie hadoop. Wiele firm ma oddzielne wystąpienie usługi Magazynu metadanych Hive w swoich środowiskach produkcyjnych, aby zarządzać wszystkimi metadanymi (metadanymi Hive lub innym niż Hive). W przypadku użytkowników, którzy mają wdrożenia programu Hive i Flink, program HiveCatalog umożliwia im używanie magazynu metadanych Hive do zarządzania metadanymi Flink.

Obsługiwana wersja programu Hive:

  • 3.1
    • 3.1.0
    • 3.1.1
    • 3.1.2
    • 3.1.3

Jeśli tworzysz własny program, potrzebujesz następujących zależności w pliku mvn. Nie zaleca się dołączania tych zależności do wynikowego pliku jar. Należy dodać zależności w czasie wykonywania.

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
</dependency>

Nawiązywanie połączenia z usługą Hive

W tym przykładzie pokazano różne fragmenty kodu nawiązywania połączenia z usługą Hive przy użyciu narzędzia Apache Flink w usłudze HDInsight w usłudze AKS. Aby nawiązać połączenie z magazynem metadanych Hive, musisz użyć /opt/hive-conf go jako katalogu konfiguracji hive

package contoso.example;

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

public class hiveDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // start Table Environment
        StreamTableEnvironment tableEnv =
                StreamTableEnvironment.create(env);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        String catalogName = "myhive";
        String defaultDatabase = HiveCatalog.DEFAULT_DB;
        String hiveConfDir = "/opt/hive-conf";
        HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
        // register HiveCatalog in the tableEnv
        tableEnv.registerCatalog("myhive", hive);
        // set the HiveCatalog as the current catalog of the session
        tableEnv.useCatalog("myhive");
        // Create a table in hive catalog
        tableEnv.executeSql("create table MyTable (name varchar(32), age int) with ('connector' = 'filesystem', 'path' = 'abfs://flink@contosogen2.dfs.core.windows.net/data/', 'format' = 'csv','csv.field-delimiter' = ',')");
        // Create a view in hive catalog
        tableEnv.executeSql("create view MyView as select * from MyTable");

        // Read from the table and print the results
        tableEnv.from("MyTable").execute().print();
        // 4. run stream
        env.execute("Hive Demo on Flink");
    }
}

W zasobniku Webssh przenieś plik jar planisty

Przenieś plik jar flink-table-planner-loader-1.17.0-*.*.*.jar znajdujący się w zasobnikach webssh /opt to /lib i wyjdź plik jar flink-table-planner-loader-1.17.0-*.*.*.jar z pliku lib. Aby uzyskać więcej informacji, zobacz problem. Wykonaj następujące kroki, aby przenieść plik jar planisty.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-1.1.8.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-1.1.8.jar /opt/flink-webssh/lib/

Uwaga

Dodatkowy plik jar planisty jest potrzebny tylko w przypadku korzystania z dialektu Hive lub punktu końcowego HiveServer2. Jest to jednak zalecana konfiguracja integracji programu Hive.

Aby uzyskać więcej informacji, zobacz How to use Hive Catalog with Apache Flink on HDInsight on AKS (Jak używać katalogu Hive z usługą Apache Flink® w usłudze HDInsight w usłudze AKS)

Spakuj plik jar i przekaż go do protokołu Webssh i uruchom polecenie

user@sshnode-0 [ ~ ]$ bin/flink run -c contoso.example.hiveDemo -j FlinkSQLServerCDCDemo-1.0-SNAPSHOT.jar 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID 5c887e1f8e1bfac501168c439a83788f
+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           Jack |          18 |
| +I |                           mike |          24 |
+----+--------------------------------+-------------+
2 rows in set

Zrzut ekranu przedstawiający sposób sprawdzania stanu zadania.

Sprawdź tabelę w interfejsie użytkownika protokołu Webssh za pośrednictwem sql-client.sh

user@sshnode-0 [ ~ ]$ bin/sql-client.sh 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

                                   ????????
                               ????????????????
                            ???????        ???????  ?
                          ????   ?????????      ?????
                          ???         ???????    ?????
                            ???            ???   ?????
                              ??       ???????????????
                            ?? ?   ???       ?????? ?????
                            ?????   ????      ????? ?????
                         ???????       ???    ??????? ???
                   ????????? ??         ??    ??????????
                  ????????  ??           ?   ?? ???????
                ????  ???            ?  ?? ???????? ?????
               ???? ? ??          ? ?? ????????    ????  ??
              ???? ????          ??????????       ??? ?? ????
           ???? ?? ???       ???????????         ????  ? ?  ???
           ???  ?? ??? ?????????              ????           ???
           ??    ? ???????              ????????          ??? ??
           ???    ???    ????????????????????            ????  ?
          ????? ???   ??????   ????????                  ????  ??
          ????????  ???????????????                            ??
          ?? ????   ???????  ???       ??????    ??          ???
          ??? ???  ???  ???????            ????   ?????????????
           ??? ?????  ????  ??                ??      ????   ???
           ??   ???   ?     ??                ??              ??
            ??   ??         ??                 ??        ????????
             ?? ?????       ??                  ???????????    ??
              ??   ????      ?                    ???????      ??
               ???   ?????                         ?? ???????????
                ????    ????                     ??????? ????????
                  ?????                          ??  ????  ?????
                      ?????????????????????????????????  ?????
          
    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ 
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ 
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
          
        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

Command history file path: /home/xcao/.flink-sql-history


Flink SQL> CREATE CATALOG myhive WITH (
>     'type' = 'hive'
> );
[INFO] Execute statement succeed.

Flink SQL> USE CATALOG myhive;
[INFO] Execute statement succeed.

Flink SQL> show tables
> ;
+------------+
| table name |
+------------+
|    mytable |
|     myview |
+------------+
2 rows in set

Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeed.

Flink SQL> select * from mytable;
+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           Jack |          18 |
| +I |                           mike |          24 |
+----+--------------------------------+-------------+
Received a total of 2 rows

Informacje