Hive Metastore gebruiken met Apache Flink® DataStream-API


Hive Metastore is in de loop der jaren uitgegroeid tot een de facto metagegevenscentrum in het Hadoop-ecosysteem. Veel bedrijven hebben een afzonderlijk Hive Metastore-service-exemplaar in hun productieomgevingen om al hun metagegevens (Hive- of niet-Hive-metagegevens) te beheren. Voor gebruikers die zowel Hive- als Flink-implementaties hebben, stelt HiveCatalog hen in staat om Hive Metastore te gebruiken om de metagegevens van Flink te beheren.

Ondersteunde Hive-versie:

  • 3.1
    • 3.1.0
    • 3.1.1
    • 3.1.2
    • 3.1.3

Als u uw eigen programma bouwt, hebt u de volgende afhankelijkheden in uw mvn-bestand nodig. Het wordt niet aanbevolen om deze afhankelijkheden op te nemen in het resulterende JAR-bestand. U moet tijdens runtime afhankelijkheden toevoegen.

Verbinding maken met Hive

In dit voorbeeld ziet u verschillende fragmenten voor het verbinden met Hive, waarbij gebruik wordt gemaakt van Apache Flink op HDInsight en AKS. U dient /opt/hive-conf als Hive-configuratiemap te gebruiken om verbinding te maken met de Hive Metastore.

package contoso.example;

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

public class hiveDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // start Table Environment
        StreamTableEnvironment tableEnv =
        String catalogName = "myhive";
        String defaultDatabase = HiveCatalog.DEFAULT_DB;
        String hiveConfDir = "/opt/hive-conf";
        HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
        // register HiveCatalog in the tableEnv
        tableEnv.registerCatalog("myhive", hive);
        // set the HiveCatalog as the current catalog of the session
        // Create a table in hive catalog
        tableEnv.executeSql("create table MyTable (name varchar(32), age int) with ('connector' = 'filesystem', 'path' = 'abfs://', 'format' = 'csv','csv.field-delimiter' = ',')");
        // Create a view in hive catalog
        tableEnv.executeSql("create view MyView as select * from MyTable");

        // Read from the table and print the results
        // 4. run stream
        env.execute("Hive Demo on Flink");

Verplaats de planner-JAR op Webssh-pod

Verplaats de JAR-flink-table-planner-loader-1.17.0-*.*.*.jar die zich in pod /opt to /lib van de webssh-pod bevindt en haal de JAR-flink-table-planner-loader-1.17.0-*.*.*.jar uit lib. Raadpleeg de kwestie voor meer details. Voer de volgende stappen uit om de planner-JAR te verplaatsen.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-1.1.8.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-1.1.8.jar /opt/flink-webssh/lib/


Een extra verplaatsing van de planner-jar is alleen nodig bij het gebruik van de Hive-dialect of het HiveServer2-eindpunt. Dit is echter de aanbevolen installatie voor Hive-integratie.

Zie Hive Catalog gebruiken met Apache Flink® in HDInsight op AKS voor meer informatie

Pak het JAR-bestand in en upload deze naar Webssh en voer deze uit

user@sshnode-0 [ ~ ]$ bin/flink run -c contoso.example.hiveDemo -j FlinkSQLServerCDCDemo-1.0-SNAPSHOT.jar 
Job has been submitted with JobID 5c887e1f8e1bfac501168c439a83788f
| op |                           name |         age |
| +I |                           Jack |          18 |
| +I |                           mike |          24 |
2 rows in set

schermopname die laat zien hoe u de taakstatus controleert.

Tabel controleren in de WebSSH-gebruikersinterface via

user@sshnode-0 [ ~ ]$ bin/ 
Command history file path: /home/xcao/.flink-sql-history

>     'type' = 'hive'
> );
[INFO] Execute statement succeed.

Flink SQL> USE CATALOG myhive;
[INFO] Execute statement succeed.

Flink SQL> show tables
> ;
| table name |
|    mytable |
|     myview |
2 rows in set

Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeed.

Flink SQL> select * from mytable;
| op |                           name |         age |
| +I |                           Jack |          18 |
| +I |                           mike |          24 |
Received a total of 2 rows
