Partilhar via


Usar o Hive Metastore com a API Apache Flink® DataStream

Nota

Vamos desativar o Azure HDInsight no AKS em 31 de janeiro de 2025. Antes de 31 de janeiro de 2025, você precisará migrar suas cargas de trabalho para o Microsoft Fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho. Os clusters restantes na sua subscrição serão interrompidos e removidos do anfitrião.

Apenas o apoio básico estará disponível até à data da reforma.

Importante

Esta funcionalidade está atualmente em pré-visualização. Os Termos de Utilização Suplementares para Pré-visualizações do Microsoft Azure incluem mais termos legais que se aplicam a funcionalidades do Azure que estão em versão beta, em pré-visualização ou ainda não disponibilizadas para disponibilidade geral. Para obter informações sobre essa visualização específica, consulte Informações de visualização do Azure HDInsight no AKS. Para perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações na Comunidade do Azure HDInsight.

Ao longo dos anos, o Hive Metastore evoluiu para um centro de metadados de fato no ecossistema Hadoop. Muitas empresas têm uma instância de serviço Hive Metastore separada em seus ambientes de produção para gerenciar todos os metadados (metadados Hive ou não-Hive). Para usuários que têm implantações do Hive e do Flink, o HiveCatalog permite que eles usem o Hive Metastore para gerenciar os metadados do Flink.

Versão Hive suportada:

  • 3.1
    • 3.1.0
    • 3.1.1
    • 3.1.2
    • 3.1.3

Se você estiver criando seu próprio programa, precisará das seguintes dependências em seu arquivo mvn. Não é recomendável incluir essas dependências no arquivo jar resultante. Você deve adicionar dependências em tempo de execução.

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
</dependency>

Conecte-se ao Hive

Este exemplo ilustra vários trechos de conexão com a colmeia, usando o Apache Flink no HDInsight no AKS, é necessário usar /opt/hive-conf como diretório de configuração do hive para se conectar ao metastore do Hive

package contoso.example;

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

public class hiveDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // start Table Environment
        StreamTableEnvironment tableEnv =
                StreamTableEnvironment.create(env);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        String catalogName = "myhive";
        String defaultDatabase = HiveCatalog.DEFAULT_DB;
        String hiveConfDir = "/opt/hive-conf";
        HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
        // register HiveCatalog in the tableEnv
        tableEnv.registerCatalog("myhive", hive);
        // set the HiveCatalog as the current catalog of the session
        tableEnv.useCatalog("myhive");
        // Create a table in hive catalog
        tableEnv.executeSql("create table MyTable (name varchar(32), age int) with ('connector' = 'filesystem', 'path' = 'abfs://flink@contosogen2.dfs.core.windows.net/data/', 'format' = 'csv','csv.field-delimiter' = ',')");
        // Create a view in hive catalog
        tableEnv.executeSql("create view MyView as select * from MyTable");

        // Read from the table and print the results
        tableEnv.from("MyTable").execute().print();
        // 4. run stream
        env.execute("Hive Demo on Flink");
    }
}

No pod Webssh, mova o frasco do planejador

Mova o frasco flink-table-planner-loader-1.17.0-*.*.*.jar localizado em webssh pod's /opt to /lib e mova o frasco flink-table-planner-loader-1.17.0-*.*.*.jar de lib. Consulte o problema para obter mais detalhes. Execute as etapas a seguir para mover o jar do planejador.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-1.1.8.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-1.1.8.jar /opt/flink-webssh/lib/

Nota

Um jar de planejador extra em movimento só é necessário ao usar o dialeto Hive ou o ponto de extremidade HiveServer2. No entanto, esta é a configuração recomendada para a integração do Hive.

Para obter mais informações, consulte Como usar o catálogo do Hive com o Apache Flink® no HDInsight no AKS

Empacote o jar e carregue-o no Webssh e execute

user@sshnode-0 [ ~ ]$ bin/flink run -c contoso.example.hiveDemo -j FlinkSQLServerCDCDemo-1.0-SNAPSHOT.jar 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID 5c887e1f8e1bfac501168c439a83788f
+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           Jack |          18 |
| +I |                           mike |          24 |
+----+--------------------------------+-------------+
2 rows in set

Captura de tela mostrando como verificar o status do trabalho.

Verifique a tabela na Webssh UI via sql-client.sh

user@sshnode-0 [ ~ ]$ bin/sql-client.sh 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-webssh/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/flink-hadoop-dep-1.17.0-1.1.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

                                   ????????
                               ????????????????
                            ???????        ???????  ?
                          ????   ?????????      ?????
                          ???         ???????    ?????
                            ???            ???   ?????
                              ??       ???????????????
                            ?? ?   ???       ?????? ?????
                            ?????   ????      ????? ?????
                         ???????       ???    ??????? ???
                   ????????? ??         ??    ??????????
                  ????????  ??           ?   ?? ???????
                ????  ???            ?  ?? ???????? ?????
               ???? ? ??          ? ?? ????????    ????  ??
              ???? ????          ??????????       ??? ?? ????
           ???? ?? ???       ???????????         ????  ? ?  ???
           ???  ?? ??? ?????????              ????           ???
           ??    ? ???????              ????????          ??? ??
           ???    ???    ????????????????????            ????  ?
          ????? ???   ??????   ????????                  ????  ??
          ????????  ???????????????                            ??
          ?? ????   ???????  ???       ??????    ??          ???
          ??? ???  ???  ???????            ????   ?????????????
           ??? ?????  ????  ??                ??      ????   ???
           ??   ???   ?     ??                ??              ??
            ??   ??         ??                 ??        ????????
             ?? ?????       ??                  ???????????    ??
              ??   ????      ?                    ???????      ??
               ???   ?????                         ?? ???????????
                ????    ????                     ??????? ????????
                  ?????                          ??  ????  ?????
                      ?????????????????????????????????  ?????
          
    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ 
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ 
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
          
        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

Command history file path: /home/xcao/.flink-sql-history


Flink SQL> CREATE CATALOG myhive WITH (
>     'type' = 'hive'
> );
[INFO] Execute statement succeed.

Flink SQL> USE CATALOG myhive;
[INFO] Execute statement succeed.

Flink SQL> show tables
> ;
+------------+
| table name |
+------------+
|    mytable |
|     myview |
+------------+
2 rows in set

Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeed.

Flink SQL> select * from mytable;
+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           Jack |          18 |
| +I |                           mike |          24 |
+----+--------------------------------+-------------+
Received a total of 2 rows

Referências