Compartilhar via


API de tabela e SQL em clusters do Apache Flink® no HDInsight no AKS

Importante

O Azure HDInsight no AKS se aposentou em 31 de janeiro de 2025. Saiba mais com este comunicado.

Você precisa migrar suas cargas de trabalho para microsoft fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho.

Importante

Esse recurso está atualmente em versão prévia. Os termos de uso complementares para o Microsoft Azure Previews incluem mais termos legais que se aplicam aos recursos do Azure que estão em versão beta, em versão prévia ou ainda não lançados em disponibilidade geral. Para obter informações sobre essa versão prévia específica, consulte Azure HDInsight em informações de visualização do AKS. Para obter perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações sobre da Comunidade do Azure HDInsight.

O Apache Flink apresenta duas APIs relacionais - a API de Tabela e o SQL - para processamento unificado de fluxo e lote. A API de Tabela é uma API de consulta integrada à linguagem que permite a composição de consultas de operadores relacionais, como seleção, filtro e junção intuitivamente. O suporte ao SQL do Flink baseia-se no Apache Calcite, que implementa o padrão SQL.

As interfaces de API de Tabela e SQL se integram perfeitamente com a API DataStream do Flink. Você pode alternar facilmente entre todas as APIs e bibliotecas, que se baseiam nelas.

Assim como outros mecanismos SQL, as consultas Flink operam na parte superior das tabelas. Ele difere de um banco de dados tradicional porque o Flink não gerencia dados em repouso localmente; Em vez disso, suas consultas operam continuamente em tabelas externas.

Os pipelines de processamento de dados do Flink começam com tabelas de origem e terminam com tabelas de destino. As tabelas de origem produzem as linhas sobre as quais operamos durante a execução da consulta; são as tabelas referenciadas na cláusula FROM de uma consulta. Os conectores podem ser do tipo HDInsight Kafka, HDInsight HBase, Hubs de Eventos do Azure, bancos de dados, sistemas de arquivos ou qualquer outro sistema cujo conector esteja no classpath.

Você pode consultar este artigo sobre como usar a CLI do Secure Shell no portal do Azure. Aqui estão alguns exemplos rápidos de como começar.

  • Para iniciar o cliente SQL

    ./bin/sql-client.sh
    
  • Para passar um arquivo sql de inicialização para execução junto com o sql-client

    ./sql-client.sh -i /path/to/init_file.sql
    
  • Para definir uma configuração no sql-client

    SET execution.runtime-mode = streaming;
    SET sql-client.execution.result-mode = table;
    SET sql-client.execution.max-table-result.rows = 10000;
    

SQL DDL (Linguagem de Definição de Dados)

O Flink SQL dá suporte às seguintes instruções CREATE

  • CRIAR TABELA
  • CRIAR BANCO DE DADOS
  • CRIAR CATÁLOGO

Veja a seguir uma sintaxe de exemplo, para definir uma tabela de origem usando o conector JDBC para se conectar ao MSSQL, com id e nome como colunas em uma instrução CREATE TABLE

CREATE TABLE student_information (
    id BIGINT,
    name STRING,  
    address STRING,
    grade STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
     'table-name' = 'students',
     'username' = 'username',
     'password' = 'password'
 );

CREATE DATABASE:

CREATE DATABASE students;

CRIAR CATÁLOGO:

CREATE CATALOG myhive WITH ('type'='hive');

Você pode executar consultas contínuas na parte superior dessas tabelas

  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Escreva na Tabela de Destino da Tabela de Origem .

  INSERT INTO grade_counts
  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Adicionando dependências

Instruções JAR são usadas para adicionar jars de usuário ao classpath ou remover jars de usuário do classpath ou mostrar jars adicionados no classpath no runtime.

O Flink SQL dá suporte às seguintes instruções JAR:

  • ADICIONAR JAR
  • MOSTRAR JARS
  • REMOVER JAR
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.

Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.

Flink SQL> SHOW JARS;
+----------------------------+
|                       jars |
+----------------------------+
|            /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+

Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.

Os catálogos fornecem metadados, como bancos de dados, tabelas, partições, exibições e funções e informações necessárias para acessar dados armazenados em um banco de dados ou em outros sistemas externos.

No HDInsight no AKS, o Flink oferece suporte a duas opções de catálogo:

CatálogoGenéricoEmMemória

O GenericInMemoryCatalog é uma implementação na memória de um catálogo. Todos os objetos estão disponíveis apenas para o tempo de vida da sessão sql.

hiveCatalog

O HiveCatalog atende a duas finalidades; como armazenamento persistente para metadados Flink puros e como uma interface para ler e gravar metadados do Hive existentes.

Nota

O HDInsight em clusters de AKS possui uma opção integrada do Metastore do Hive para Apache Flink. Você pode optar pelo Metastore do Hive durante criação de cluster

Você pode consultar este artigo sobre como usar a CLI e começar a usar o Flink SQL Client do Secure Shell no portal do Azure.

  • Iniciar sessão sql-client.sh

    Captura de tela que mostra o catálogo padrão do Hive.

    Default_catalog é o catálogo padrão na memória

  • Agora, vamos verificar o banco de dados padrão do catálogo na memória Captura de tela mostrando catálogos padrão na memória.

  • Vamos criar o Catálogo do Hive da versão 3.1.2 e usá-lo

      CREATE CATALOG myhive WITH ('type'='hive');
      USE CATALOG myhive;
    

    Nota

    O HDInsight no AKS dá suporte a hive 3.1.2 e Hadoop 3.3.2. O hive-conf-dir está definido para a localização /opt/hive-conf

  • Vamos criar o Banco de Dados no catálogo do Hive e defini-lo como padrão para a sessão (a menos que seja alterado). Captura de tela mostrando a criação de banco de dados no catálogo do hive e tornando-o o catálogo padrão para a sessão.

Como criar e registrar tabelas do Hive no catálogo do Hive

  • Siga as instruções sobre Como criar e registrar bancos de dados Flink no catálogo

  • Vamos criar a Tabela Flink do tipo de conector Hive sem partição

    CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    
  • Inserir dados na tabela Hive

    INSERT INTO hive_table SELECT 2, '10';
    INSERT INTO hive_table SELECT 3, '20';
    
  • Ler dados da tabela Hive

      Flink SQL> SELECT * FROM hive_table;
      2023-07-24 09:46:22,225 INFO  org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3
      +----+-------------+--------------------------------+
      | op |           x |                           days |
      +----+-------------+--------------------------------+
      | +I |           3 |                             20 |
      | +I |           2 |                             10 |
      | +I |           1 |                              5 |
      +----+-------------+--------------------------------+
      Received a total of 3 rows
    

    Nota

    O Diretório do Hive Warehouse está localizado no contêiner designado da conta de armazenamento escolhida durante a criação do cluster Apache Flink, pode ser encontrado no diretório hive/warehouse/

  • Vamos criar uma tabela Flink do tipo conector Hive com Partição.

    CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    

Importante

Há uma limitação conhecida no Apache Flink. As últimas colunas 'n' são escolhidas para partições, independentemente da coluna de partição definida pelo usuário. FLINK-32596 A chave de partição estará errada ao usar o dialeto Flink para criar a tabela hive.

Referência