Partilhar via


API de tabela e SQL em clusters Apache Flink® no HDInsight no AKS

Nota

Vamos desativar o Azure HDInsight no AKS em 31 de janeiro de 2025. Antes de 31 de janeiro de 2025, você precisará migrar suas cargas de trabalho para o Microsoft Fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho. Os clusters restantes na sua subscrição serão interrompidos e removidos do anfitrião.

Apenas o apoio básico estará disponível até à data da reforma.

Importante

Esta funcionalidade está atualmente em pré-visualização. Os Termos de Utilização Suplementares para Pré-visualizações do Microsoft Azure incluem mais termos legais que se aplicam a funcionalidades do Azure que estão em versão beta, em pré-visualização ou ainda não disponibilizadas para disponibilidade geral. Para obter informações sobre essa visualização específica, consulte Informações de visualização do Azure HDInsight no AKS. Para perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações na Comunidade do Azure HDInsight.

O Apache Flink apresenta duas APIs relacionais - a API Table e SQL - para fluxo unificado e processamento em lote. A API de tabela é uma API de consulta integrada à linguagem que permite a composição de consultas de operadores relacionais, como seleção, filtro e junção intuitivamente. O suporte SQL da Flink é baseado no Apache Calcite, que implementa o padrão SQL.

A API de tabela e as interfaces SQL integram-se perfeitamente entre si e com a API DataStream da Flink. Você pode alternar facilmente entre todas as APIs e bibliotecas, que se baseiam nelas.

Como outros mecanismos SQL, as consultas Flink operam em cima de tabelas. Ele difere de um banco de dados tradicional porque o Flink não gerencia dados em repouso localmente; em vez disso, suas consultas operam continuamente em tabelas externas.

Os pipelines de processamento de dados Flink começam com tabelas de origem e terminam com tabelas de coletores. As tabelas de origem produzem linhas operadas durante a execução da consulta; são as tabelas referenciadas na cláusula FROM de uma consulta. Os conectores podem ser do tipo HDInsight Kafka, HDInsight HBase, Hubs de Eventos do Azure, bancos de dados, sistemas de arquivos ou qualquer outro sistema cujo conector esteja no classpath.

Você pode consultar este artigo sobre como usar a CLI do Secure Shell no portal do Azure. Aqui estão alguns exemplos rápidos de como começar.

  • Para iniciar o cliente SQL

    ./bin/sql-client.sh
    
  • Para passar um arquivo sql de inicialização para ser executado junto com sql-client

    ./sql-client.sh -i /path/to/init_file.sql
    
  • Para definir uma configuração no sql-client

    SET execution.runtime-mode = streaming;
    SET sql-client.execution.result-mode = table;
    SET sql-client.execution.max-table-result.rows = 10000;
    

SQL DDL

Flink SQL suporta as seguintes instruções CREATE

  • CREATE TABLE
  • CREATE DATABASE
  • CRIAR CATÁLOGO

A seguir está um exemplo de sintaxe para definir uma tabela de origem usando o conector jdbc para se conectar ao MSSQL, com id, nome como colunas em uma instrução CREATE TABLE

CREATE TABLE student_information (
    id BIGINT,
    name STRING,  
    address STRING,
    grade STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
     'table-name' = 'students',
     'username' = 'username',
     'password' = 'password'
 );

CRIAR BASE DE DADOS :

CREATE DATABASE students;

CRIAR CATÁLOGO:

CREATE CATALOG myhive WITH ('type'='hive');

Você pode executar consultas contínuas na parte superior dessas tabelas

  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Escreva na tabela do coletor a partir da tabela de origem:

  INSERT INTO grade_counts
  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Adicionando dependências

As instruções JAR são usadas para adicionar jars de usuário no classpath ou remover jars de usuário do classpath ou mostrar jars adicionados no classpath no tempo de execução.

O Flink SQL suporta as seguintes instruções JAR:

  • ADD JAR
  • MOSTRAR FRASCOS
  • REMOVER JAR
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.

Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.

Flink SQL> SHOW JARS;
+----------------------------+
|                       jars |
+----------------------------+
|            /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+

Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.

Os catálogos fornecem metadados, como bancos de dados, tabelas, partições, exibições e funções e informações necessárias para acessar dados armazenados em um banco de dados ou outros sistemas externos.

No HDInsight no AKS, o Flink suporta duas opções de catálogo:

GenericInMemoryCatalog

O GenericInMemoryCatalog é uma implementação na memória de um catálogo. Todos os objetos estão disponíveis apenas para o tempo de vida da sessão sql.

HiveCatalog

O HiveCatalog serve a duas finalidades: como armazenamento persistente para metadados Flink puros e como uma interface para ler e gravar metadados existentes do Hive.

Nota

O HDInsight em clusters AKS vem com uma opção integrada do Hive Metastore para Apache Flink. Você pode optar pelo Hive Metastore durante a criação do cluster

Você pode consultar este artigo sobre como usar a CLI e começar a usar o Flink SQL Client do Secure Shell no portal do Azure.

  • Iniciar sql-client.sh sessão

    Captura de tela mostrando o catálogo de hive padrão.

    Default_catalog é o catálogo na memória padrão

  • Vamos agora verificar o banco de dados padrão do catálogo na memória Captura de ecrã a mostrar catálogos predefinidos na memória.

  • Vamos criar o Hive Catalog da versão 3.1.2 e usá-lo

      CREATE CATALOG myhive WITH ('type'='hive');
      USE CATALOG myhive;
    

    Nota

    O HDInsight no AKS suporta Hive 3.1.2 e Hadoop 3.3.2. O hive-conf-dir está definido como localização /opt/hive-conf

  • Vamos criar banco de dados no catálogo hive e torná-lo padrão para a sessão (a menos que seja alterado). Captura de tela mostrando a criação do banco de dados no catálogo do hive e tornando-o catálogo padrão para a sessão.

Como criar e registrar tabelas do Hive no catálogo do Hive

  • Siga as instruções sobre Como criar e registrar bancos de dados Flink para catálogo

  • Vamos criar Flink Tabela do conector tipo Hive sem partição

    CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    
  • Inserir dados no hive_table

    INSERT INTO hive_table SELECT 2, '10';
    INSERT INTO hive_table SELECT 3, '20';
    
  • Ler dados de hive_table

      Flink SQL> SELECT * FROM hive_table;
      2023-07-24 09:46:22,225 INFO  org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3
      +----+-------------+--------------------------------+
      | op |           x |                           days |
      +----+-------------+--------------------------------+
      | +I |           3 |                             20 |
      | +I |           2 |                             10 |
      | +I |           1 |                              5 |
      +----+-------------+--------------------------------+
      Received a total of 3 rows
    

    Nota

    O Hive Warehouse Directory está localizado no contêiner designado da conta de armazenamento escolhido durante a criação do cluster Apache Flink, pode ser encontrado no diretório hive/warehouse/

  • Vamos criar Flink Tabela de hive tipo conector com partição

    CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    

Importante

Há uma limitação conhecida no Apache Flink. As últimas colunas 'n' são escolhidas para partições, independentemente da coluna de partição definida pelo usuário. FLINK-32596 A chave de partição estará errada quando usar o dialeto Flink para criar a tabela Hive.

Referência