Delen via


Hive Metastore gebruiken met Apache Flink® DataStream-API

Belangrijk

Azure HDInsight op AKS is op 31 januari 2025 buiten gebruik gesteld. Meer informatie over in deze aankondiging.

U moet uw workloads migreren naar Microsoft Fabric- of een gelijkwaardig Azure-product om plotselinge beëindiging van uw workloads te voorkomen.

Belangrijk

Deze functie is momenteel beschikbaar als preview-versie. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews meer juridische voorwaarden bevatten die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet in algemene beschikbaarheid zijn vrijgegeven. Zie Azure HDInsight in AKS preview-informatievoor meer informatie over deze specifieke preview. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight- met de details en volgt u ons voor meer updates over Azure HDInsight Community-.

Hive Metastore is in de loop der jaren uitgegroeid tot een de facto metagegevenscentrum in het Hadoop-ecosysteem. Veel bedrijven hebben een afzonderlijk Hive Metastore-service-exemplaar in hun productieomgevingen om al hun metagegevens (Hive- of niet-Hive-metagegevens) te beheren. Voor gebruikers die zowel Hive- als Flink-implementaties hebben, stelt HiveCatalog hen in staat om Hive Metastore te gebruiken om de metagegevens van Flink te beheren.

Ondersteunde Hive-versie:

  • 3.1
    • 3.1.0
    • 3.1.1
    • 3.1.2
    • 3.1.3

Als u uw eigen programma bouwt, hebt u de volgende afhankelijkheden in uw mvn-bestand nodig. Het wordt niet aanbevolen om deze afhankelijkheden op te nemen in het resulterende JAR-bestand. U moet tijdens runtime afhankelijkheden toevoegen.

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
</dependency>

Verbinding maken met Hive

In dit voorbeeld ziet u verschillende fragmenten voor het verbinden met Hive, waarbij gebruik wordt gemaakt van Apache Flink op HDInsight en AKS. U dient /opt/hive-conf als Hive-configuratiemap te gebruiken om verbinding te maken met de Hive Metastore.

package contoso.example;

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

public class hiveDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // start Table Environment
        StreamTableEnvironment tableEnv =
                StreamTableEnvironment.create(env);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        String catalogName = "myhive";
        String defaultDatabase = HiveCatalog.DEFAULT_DB;
        String hiveConfDir = "/opt/hive-conf";
        HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
        // register HiveCatalog in the tableEnv
        tableEnv.registerCatalog("myhive", hive);
        // set the HiveCatalog as the current catalog of the session
        tableEnv.useCatalog("myhive");
        // Create a table in hive catalog
        tableEnv.executeSql("create table MyTable (name varchar(32), age int) with ('connector' = 'filesystem', 'path' = 'abfs://flink@contosogen2.dfs.core.windows.net/data/', 'format' = 'csv','csv.field-delimiter' = ',')");
        // Create a view in hive catalog
        tableEnv.executeSql("create view MyView as select * from MyTable");

        // Read from the table and print the results
        tableEnv.from("MyTable").execute().print();
        // 4. run stream
        env.execute("Hive Demo on Flink");
    }
}

Verplaats de planner-JAR op Webssh-pod

Verplaats de JAR-flink-table-planner-loader-1.17.0-*.*.*.jar die zich in pod /opt to /lib van de webssh-pod bevindt en haal de JAR-flink-table-planner-loader-1.17.0-*.*.*.jar uit lib. Raadpleeg de kwestie voor meer details. Voer de volgende stappen uit om de planner-JAR te verplaatsen.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-1.1.8.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-1.1.8.jar /opt/flink-webssh/lib/

Notitie

Een extra verplaatsing van de planner-jar is alleen nodig bij het gebruik van de Hive-dialect of het HiveServer2-eindpunt. Dit is echter de aanbevolen installatie voor Hive-integratie.

Zie Hive Catalog gebruiken met Apache Flink® in HDInsight op AKS voor meer informatie

Pak het JAR-bestand in en upload deze naar Webssh en voer deze uit

user@sshnode-0 [ ~ ]$ bin/flink run -c contoso.example.hiveDemo -j FlinkSQLServerCDCDemo-1.0-SNAPSHOT.jar 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID 5c887e1f8e1bfac501168c439a83788f
+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           Jack |          18 |
| +I |                           mike |          24 |
+----+--------------------------------+-------------+
2 rows in set

schermopname die laat zien hoe u de taakstatus controleert.

Tabel controleren in de WebSSH-gebruikersinterface via sql-client.sh

user@sshnode-0 [ ~ ]$ bin/sql-client.sh 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

                                   ????????
                               ????????????????
                            ???????        ???????  ?
                          ????   ?????????      ?????
                          ???         ???????    ?????
                            ???            ???   ?????
                              ??       ???????????????
                            ?? ?   ???       ?????? ?????
                            ?????   ????      ????? ?????
                         ???????       ???    ??????? ???
                   ????????? ??         ??    ??????????
                  ????????  ??           ?   ?? ???????
                ????  ???            ?  ?? ???????? ?????
               ???? ? ??          ? ?? ????????    ????  ??
              ???? ????          ??????????       ??? ?? ????
           ???? ?? ???       ???????????         ????  ? ?  ???
           ???  ?? ??? ?????????              ????           ???
           ??    ? ???????              ????????          ??? ??
           ???    ???    ????????????????????            ????  ?
          ????? ???   ??????   ????????                  ????  ??
          ????????  ???????????????                            ??
          ?? ????   ???????  ???       ??????    ??          ???
          ??? ???  ???  ???????            ????   ?????????????
           ??? ?????  ????  ??                ??      ????   ???
           ??   ???   ?     ??                ??              ??
            ??   ??         ??                 ??        ????????
             ?? ?????       ??                  ???????????    ??
              ??   ????      ?                    ???????      ??
               ???   ?????                         ?? ???????????
                ????    ????                     ??????? ????????
                  ?????                          ??  ????  ?????
                      ?????????????????????????????????  ?????
          
    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ 
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ 
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
          
        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

Command history file path: /home/xcao/.flink-sql-history


Flink SQL> CREATE CATALOG myhive WITH (
>     'type' = 'hive'
> );
[INFO] Execute statement succeed.

Flink SQL> USE CATALOG myhive;
[INFO] Execute statement succeed.

Flink SQL> show tables
> ;
+------------+
| table name |
+------------+
|    mytable |
|     myview |
+------------+
2 rows in set

Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeed.

Flink SQL> select * from mytable;
+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           Jack |          18 |
| +I |                           mike |          24 |
+----+--------------------------------+-------------+
Received a total of 2 rows

Verwijzingen