Usar Metastore de Hive con la API de DataStream de Apache Flink®
A lo largo de los años, Hive Metastore ha evolucionado en un centro de metadatos de facto en el ecosistema de Hadoop. Muchas empresas tienen una instancia de servicio de Metastore de Hive independiente en sus entornos de producción para administrar todos sus metadatos (hive o metadatos que no son de Hive). Para los usuarios que tienen implementaciones de Hive y Flink, HiveCatalog les permite usar Metastore de Hive para administrar los metadatos de Flink.
Versiones de Hive compatibles para clústeres de Apache Flink en HDInsight en AKS
Versión de Hive admitida:
- 3.1
- 3.1.0
- 3.1.1
- 3.1.2
- 3.1.3
Si va a compilar su propio programa, necesitará las siguientes dependencias en su archivo mvn. No se recomienda incluir estas dependencias en el archivo jar resultante. Se supone que deberías agregar dependencias durante la ejecución.
Conexión a Hive
En este ejemplo se muestran varios fragmentos de código para conexión a Hive, utilizando Apache Flink en HDInsight en AKS. Debes usar /opt/hive-conf
como directorio de configuración para Hive para conectarte con el metastore de Hive.
package contoso.example;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class hiveDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start Table Environment
StreamTableEnvironment tableEnv =
String catalogName = "myhive";
String defaultDatabase = HiveCatalog.DEFAULT_DB;
String hiveConfDir = "/opt/hive-conf";
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
// register HiveCatalog in the tableEnv
tableEnv.registerCatalog("myhive", hive);
// set the HiveCatalog as the current catalog of the session
// Create a table in hive catalog
tableEnv.executeSql("create table MyTable (name varchar(32), age int) with ('connector' = 'filesystem', 'path' = 'abfs://', 'format' = 'csv','csv.field-delimiter' = ',')");
// Create a view in hive catalog
tableEnv.executeSql("create view MyView as select * from MyTable");
// Read from the table and print the results
// 4. run stream
env.execute("Hive Demo on Flink");
En el pod de Webssh, mueva el archivo JAR del planificador.
Mueva el tarro flink-table-planner-loader-1.17.0-*.*.*.jar
ubicado en el pod webssh /opt to /lib
y saque el tarro flink-table-planner-loader-1.17.0-*.*.*.jar
de lib
. Refiérase al tema para más detalles. Siga los siguientes pasos para mover el archivo .jar de Planner.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-1.1.8.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-1.1.8.jar /opt/flink-webssh/lib/
Solo se necesita un jar adicional de planner en movimiento al usar el dialecto de Hive o el punto de conexión de HiveServer2. Sin embargo, esta es la configuración recomendada para la integración de Hive.
Para obtener más información, consulte Uso del catálogo de Hive con Apache Flink® en HDInsight en AKS
Empaquetar el archivo jar y cargarlo en Webssh y ejecutarlo
user@sshnode-0 [ ~ ]$ bin/flink run -c contoso.example.hiveDemo -j FlinkSQLServerCDCDemo-1.0-SNAPSHOT.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID 5c887e1f8e1bfac501168c439a83788f
| op | name | age |
| +I | Jack | 18 |
| +I | mike | 24 |
2 rows in set
Verificar JOB en ejecución en la interfaz de usuario de Flink
Verificar la tabla en la interfaz de usuario de Webssh a través de
user@sshnode-0 [ ~ ]$ bin/
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Command history file path: /home/xcao/.flink-sql-history
> 'type' = 'hive'
> );
[INFO] Execute statement succeed.
Flink SQL> USE CATALOG myhive;
[INFO] Execute statement succeed.
Flink SQL> show tables
> ;
| table name |
| mytable |
| myview |
2 rows in set
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeed.
Flink SQL> select * from mytable;
| op | name | age |
| +I | Jack | 18 |
| +I | mike | 24 |
Received a total of 2 rows
- lectura de Hive & escritura
