Partager via


Utiliser le metastore Hive avec l’API Apache Flink® DataStream

Important

Azure HDInsight sur AKS a été mis hors service le 31 janvier 2025. En savoir plus avec cette annonce.

Vous devez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent pour éviter l’arrêt brusque de vos charges de travail.

Important

Cette fonctionnalité est actuellement en préversion. Les Conditions d’utilisation supplémentaires pour les préversions Microsoft Azure incluent des termes juridiques supplémentaires qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou qui ne sont pas encore publiées en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les informations sur la préversion de Azure HDInsight sur AKS . Pour des questions ou des suggestions de fonctionnalités, envoyez une demande sur AskHDInsight avec les détails et suivez-nous pour plus de mises à jour sur Communauté Azure HDInsight.

Au fil des ans, Hive Metastore a évolué en centre de métadonnées de facto dans l’écosystème Hadoop. De nombreuses entreprises disposent d’une instance de service Hive Metastore distincte dans leurs environnements de production pour gérer toutes leurs métadonnées (métadonnées Hive ou non Hive). Pour les utilisateurs disposant à la fois de déploiements Hive et Flink, HiveCatalog leur permet d’utiliser le metastore Hive pour gérer les métadonnées de Flink.

Version Hive prise en charge :

  • 3.1
    • 3.1.0
    • 3.1.1
    • 3.1.2
    • 3.1.3

Si vous créez votre propre programme, vous avez besoin des dépendances suivantes dans votre fichier mvn. Il n’est pas recommandé d’inclure ces dépendances dans le fichier jar résultant. Vous êtes censé ajouter des dépendances au moment de l’exécution.

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
</dependency>

Se connecter à Hive

Cet exemple illustre divers exemples de code de connexion à Hive, à l’aide d’Apache Flink sur HDInsight sur AKS, et vous devez utiliser /opt/hive-conf comme répertoire de configuration Hive pour vous connecter au metastore Hive.

package contoso.example;

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

public class hiveDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // start Table Environment
        StreamTableEnvironment tableEnv =
                StreamTableEnvironment.create(env);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        String catalogName = "myhive";
        String defaultDatabase = HiveCatalog.DEFAULT_DB;
        String hiveConfDir = "/opt/hive-conf";
        HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
        // register HiveCatalog in the tableEnv
        tableEnv.registerCatalog("myhive", hive);
        // set the HiveCatalog as the current catalog of the session
        tableEnv.useCatalog("myhive");
        // Create a table in hive catalog
        tableEnv.executeSql("create table MyTable (name varchar(32), age int) with ('connector' = 'filesystem', 'path' = 'abfs://flink@contosogen2.dfs.core.windows.net/data/', 'format' = 'csv','csv.field-delimiter' = ',')");
        // Create a view in hive catalog
        tableEnv.executeSql("create view MyView as select * from MyTable");

        // Read from the table and print the results
        tableEnv.from("MyTable").execute().print();
        // 4. run stream
        env.execute("Hive Demo on Flink");
    }
}

Sur le pod Webssh, déplacez le fichier jar du planificateur

Déplacez le jar flink-table-planner-loader-1.17.0-*.*.*.jar situé dans le pod webssh /opt to /lib et sortez le jar flink-table-planner-loader-1.17.0-*.*.*.jar de lib. Pour plus d’informations, reportez-vous au problème. Effectuez les étapes suivantes pour déplacer le fichier JAR du planificateur.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-1.1.8.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-1.1.8.jar /opt/flink-webssh/lib/

Remarque

Un déplacement de fichier jar de planificateur supplémentaire est nécessaire uniquement lors de l’utilisation du dialecte Hive ou du point de terminaison HiveServer2. Toutefois, il s’agit de la configuration recommandée pour l’intégration de Hive.

Pour plus d’informations, consultez Guide pratique pour utiliser le catalogue Hive avec Apache Flink® sur HDInsight sur AKS

Empaqueter le fichier jar et le charger dans Webssh et exécuter

user@sshnode-0 [ ~ ]$ bin/flink run -c contoso.example.hiveDemo -j FlinkSQLServerCDCDemo-1.0-SNAPSHOT.jar 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID 5c887e1f8e1bfac501168c439a83788f
+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           Jack |          18 |
| +I |                           mike |          24 |
+----+--------------------------------+-------------+
2 rows in set

Capture d’écran montrant comment vérifier l’état du travail.

Vérifier la table sur l'interface utilisateur WebSSH via sql-client.sh

user@sshnode-0 [ ~ ]$ bin/sql-client.sh 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

                                   ????????
                               ????????????????
                            ???????        ???????  ?
                          ????   ?????????      ?????
                          ???         ???????    ?????
                            ???            ???   ?????
                              ??       ???????????????
                            ?? ?   ???       ?????? ?????
                            ?????   ????      ????? ?????
                         ???????       ???    ??????? ???
                   ????????? ??         ??    ??????????
                  ????????  ??           ?   ?? ???????
                ????  ???            ?  ?? ???????? ?????
               ???? ? ??          ? ?? ????????    ????  ??
              ???? ????          ??????????       ??? ?? ????
           ???? ?? ???       ???????????         ????  ? ?  ???
           ???  ?? ??? ?????????              ????           ???
           ??    ? ???????              ????????          ??? ??
           ???    ???    ????????????????????            ????  ?
          ????? ???   ??????   ????????                  ????  ??
          ????????  ???????????????                            ??
          ?? ????   ???????  ???       ??????    ??          ???
          ??? ???  ???  ???????            ????   ?????????????
           ??? ?????  ????  ??                ??      ????   ???
           ??   ???   ?     ??                ??              ??
            ??   ??         ??                 ??        ????????
             ?? ?????       ??                  ???????????    ??
              ??   ????      ?                    ???????      ??
               ???   ?????                         ?? ???????????
                ????    ????                     ??????? ????????
                  ?????                          ??  ????  ?????
                      ?????????????????????????????????  ?????
          
    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ 
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ 
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
          
        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

Command history file path: /home/xcao/.flink-sql-history


Flink SQL> CREATE CATALOG myhive WITH (
>     'type' = 'hive'
> );
[INFO] Execute statement succeed.

Flink SQL> USE CATALOG myhive;
[INFO] Execute statement succeed.

Flink SQL> show tables
> ;
+------------+
| table name |
+------------+
|    mytable |
|     myview |
+------------+
2 rows in set

Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeed.

Flink SQL> select * from mytable;
+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           Jack |          18 |
| +I |                           mike |          24 |
+----+--------------------------------+-------------+
Received a total of 2 rows

Références