Use o Hive Metastore com o API do DataStream do Apache Flink®
Observação
Desativaremos o Microsoft Azure HDInsight no AKS em 31 de janeiro de 2025. Para evitar o encerramento abrupto das suas cargas de trabalho, você precisará migrá-las para o Microsoft Fabric ou para um produto equivalente do Azure antes de 31 de janeiro de 2025. Os clusters restantes em sua assinatura serão interrompidos e removidos do host.
Somente o suporte básico estará disponível até a data de desativação.
Importante
Esse recurso está atualmente na visualização. Os Termos de uso complementares para versões prévias do Microsoft Azure incluem mais termos legais que se aplicam aos recursos do Azure que estão em versão beta, em versão prévia ou ainda não lançados em disponibilidade geral. Para obter informações sobre essa versão prévia específica, confira Informações sobre a versão prévia do Azure HDInsight no AKS. No caso de perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para ver mais atualizações sobre a Comunidade do Azure HDInsight.
Ao longo dos anos, o Metastore do Hive evoluiu para um centro de metadados de fato no ecossistema do Hadoop. Muitas empresas têm uma instância de serviço do Metastore do Hive separada em seus ambientes de produção para gerenciar todos os metadados (Metadados do Hive ou não pertencentes ao Hive). Para usuários que têm implantações de Hive e Flink, o HiveCatalog permite que eles usem o Metastore do Hive para gerenciar os metadados do Flink.
Versões do Hive com suporte para clusters do Apache Flink no Azure HDInsight no AKS
Versão do Hive com suporte:
- 3.1
- 3.1.0
- 3.1.1
- 3.1.2
- 3.1.3
Se você estiver criando seu próprio programa, precisará das dependências a seguir no arquivo mvn. Não é recomendável incluir essas dependências no arquivo jar resultante. Você deve adicionar dependências em runtime.
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Conectar-se ao Hive
Este exemplo ilustra vários trechos de conexão com o Hive, usando o Apache Flink no Azure HDInsight no AKS. É requerido que você use /opt/hive-conf
como diretório de configuração do Hive para se conectar ao metastore do Hive
package contoso.example;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class hiveDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start Table Environment
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
String catalogName = "myhive";
String defaultDatabase = HiveCatalog.DEFAULT_DB;
String hiveConfDir = "/opt/hive-conf";
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
// register HiveCatalog in the tableEnv
tableEnv.registerCatalog("myhive", hive);
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
// Create a table in hive catalog
tableEnv.executeSql("create table MyTable (name varchar(32), age int) with ('connector' = 'filesystem', 'path' = 'abfs://flink@contosogen2.dfs.core.windows.net/data/', 'format' = 'csv','csv.field-delimiter' = ',')");
// Create a view in hive catalog
tableEnv.executeSql("create view MyView as select * from MyTable");
// Read from the table and print the results
tableEnv.from("MyTable").execute().print();
// 4. run stream
env.execute("Hive Demo on Flink");
}
}
No pod Webssh, mova o jar do planejador
Mova o jar flink-table-planner-loader-1.17.0-*.*.*.jar
localizado no /opt to /lib
do pod webssh e mova o jar flink-table-planner-loader-1.17.0-*.*.*.jar
do lib
. Consulte problema para obter mais detalhes. Execute as etapas a seguir para mover o jar do planejador.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-1.1.8.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-1.1.8.jar /opt/flink-webssh/lib/
Observação
Uma movimentação de jar de planejador extra só é necessária ao usar o dialeto do Hive ou o ponto de extremidade HiveServer2. No entanto, essa é a configuração recomendada para a integração do Hive.
Para obter mais informações, consulte Como usar o Catálogo do Hive com o Apache Flink® no HDInsight no AKS
Empacotar o jar e carregá-lo no Webssh e executar
user@sshnode-0 [ ~ ]$ bin/flink run -c contoso.example.hiveDemo -j FlinkSQLServerCDCDemo-1.0-SNAPSHOT.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID 5c887e1f8e1bfac501168c439a83788f
+----+--------------------------------+-------------+
| op | name | age |
+----+--------------------------------+-------------+
| +I | Jack | 18 |
| +I | mike | 24 |
+----+--------------------------------+-------------+
2 rows in set
Verificar o TRABALHO em execução na interface do usuário do Flink
Verificar a tabela na interface do usuário do Webssh via sql-client.sh
user@sshnode-0 [ ~ ]$ bin/sql-client.sh
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
????????
????????????????
??????? ??????? ?
???? ????????? ?????
??? ??????? ?????
??? ??? ?????
?? ???????????????
?? ? ??? ?????? ?????
????? ???? ????? ?????
??????? ??? ??????? ???
????????? ?? ?? ??????????
???????? ?? ? ?? ???????
???? ??? ? ?? ???????? ?????
???? ? ?? ? ?? ???????? ???? ??
???? ???? ?????????? ??? ?? ????
???? ?? ??? ??????????? ???? ? ? ???
??? ?? ??? ????????? ???? ???
?? ? ??????? ???????? ??? ??
??? ??? ???????????????????? ???? ?
????? ??? ?????? ???????? ???? ??
???????? ??????????????? ??
?? ???? ??????? ??? ?????? ?? ???
??? ??? ??? ??????? ???? ?????????????
??? ????? ???? ?? ?? ???? ???
?? ??? ? ?? ?? ??
?? ?? ?? ?? ????????
?? ????? ?? ??????????? ??
?? ???? ? ??????? ??
??? ????? ?? ???????????
???? ???? ??????? ????????
????? ?? ???? ?????
????????????????????????????????? ?????
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Command history file path: /home/xcao/.flink-sql-history
Flink SQL> CREATE CATALOG myhive WITH (
> 'type' = 'hive'
> );
[INFO] Execute statement succeed.
Flink SQL> USE CATALOG myhive;
[INFO] Execute statement succeed.
Flink SQL> show tables
> ;
+------------+
| table name |
+------------+
| mytable |
| myview |
+------------+
2 rows in set
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeed.
Flink SQL> select * from mytable;
+----+--------------------------------+-------------+
| op | name | age |
+----+--------------------------------+-------------+
| +I | Jack | 18 |
| +I | mike | 24 |
+----+--------------------------------+-------------+
Received a total of 2 rows
Referências
- leitura e gravação do Hive
- Apache, Apache Hive, Hive, Apache Flink, Flink e nomes de projetos de código aberto associados são marcas registradas da Apache Software Foundation (ASF).