API de tabela e SQL em clusters Apache Flink® no HDInsight no AKS
Nota
Vamos desativar o Azure HDInsight no AKS em 31 de janeiro de 2025. Antes de 31 de janeiro de 2025, você precisará migrar suas cargas de trabalho para o Microsoft Fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho. Os clusters restantes na sua subscrição serão interrompidos e removidos do anfitrião.
Apenas o apoio básico estará disponível até à data da reforma.
Importante
Esta funcionalidade está atualmente em pré-visualização. Os Termos de Utilização Suplementares para Pré-visualizações do Microsoft Azure incluem mais termos legais que se aplicam a funcionalidades do Azure que estão em versão beta, em pré-visualização ou ainda não disponibilizadas para disponibilidade geral. Para obter informações sobre essa visualização específica, consulte Informações de visualização do Azure HDInsight no AKS. Para perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações na Comunidade do Azure HDInsight.
O Apache Flink apresenta duas APIs relacionais - a API Table e SQL - para fluxo unificado e processamento em lote. A API de tabela é uma API de consulta integrada à linguagem que permite a composição de consultas de operadores relacionais, como seleção, filtro e junção intuitivamente. O suporte SQL da Flink é baseado no Apache Calcite, que implementa o padrão SQL.
A API de tabela e as interfaces SQL integram-se perfeitamente entre si e com a API DataStream da Flink. Você pode alternar facilmente entre todas as APIs e bibliotecas, que se baseiam nelas.
Apache Flink SQL
Como outros mecanismos SQL, as consultas Flink operam em cima de tabelas. Ele difere de um banco de dados tradicional porque o Flink não gerencia dados em repouso localmente; em vez disso, suas consultas operam continuamente em tabelas externas.
Os pipelines de processamento de dados Flink começam com tabelas de origem e terminam com tabelas de coletores. As tabelas de origem produzem linhas operadas durante a execução da consulta; são as tabelas referenciadas na cláusula FROM de uma consulta. Os conectores podem ser do tipo HDInsight Kafka, HDInsight HBase, Hubs de Eventos do Azure, bancos de dados, sistemas de arquivos ou qualquer outro sistema cujo conector esteja no classpath.
Usando o Flink SQL Client no HDInsight em clusters AKS
Você pode consultar este artigo sobre como usar a CLI do Secure Shell no portal do Azure. Aqui estão alguns exemplos rápidos de como começar.
Para iniciar o cliente SQL
./bin/sql-client.sh
Para passar um arquivo sql de inicialização para ser executado junto com sql-client
./sql-client.sh -i /path/to/init_file.sql
Para definir uma configuração no sql-client
SET execution.runtime-mode = streaming; SET sql-client.execution.result-mode = table; SET sql-client.execution.max-table-result.rows = 10000;
SQL DDL
Flink SQL suporta as seguintes instruções CREATE
- CREATE TABLE
- CREATE DATABASE
- CRIAR CATÁLOGO
A seguir está um exemplo de sintaxe para definir uma tabela de origem usando o conector jdbc para se conectar ao MSSQL, com id, nome como colunas em uma instrução CREATE TABLE
CREATE TABLE student_information (
id BIGINT,
name STRING,
address STRING,
grade STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
'table-name' = 'students',
'username' = 'username',
'password' = 'password'
);
CRIAR BASE DE DADOS :
CREATE DATABASE students;
CRIAR CATÁLOGO:
CREATE CATALOG myhive WITH ('type'='hive');
Você pode executar consultas contínuas na parte superior dessas tabelas
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
Escreva na tabela do coletor a partir da tabela de origem:
INSERT INTO grade_counts
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
Adicionando dependências
As instruções JAR são usadas para adicionar jars de usuário no classpath ou remover jars de usuário do classpath ou mostrar jars adicionados no classpath no tempo de execução.
O Flink SQL suporta as seguintes instruções JAR:
- ADD JAR
- MOSTRAR FRASCOS
- REMOVER JAR
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.
Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.
Flink SQL> SHOW JARS;
+----------------------------+
| jars |
+----------------------------+
| /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+
Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.
Hive Metastore em clusters Apache Flink® no HDInsight no AKS
Os catálogos fornecem metadados, como bancos de dados, tabelas, partições, exibições e funções e informações necessárias para acessar dados armazenados em um banco de dados ou outros sistemas externos.
No HDInsight no AKS, o Flink suporta duas opções de catálogo:
GenericInMemoryCatalog
O GenericInMemoryCatalog é uma implementação na memória de um catálogo. Todos os objetos estão disponíveis apenas para o tempo de vida da sessão sql.
HiveCatalog
O HiveCatalog serve a duas finalidades: como armazenamento persistente para metadados Flink puros e como uma interface para ler e gravar metadados existentes do Hive.
Nota
O HDInsight em clusters AKS vem com uma opção integrada do Hive Metastore para Apache Flink. Você pode optar pelo Hive Metastore durante a criação do cluster
Como criar e registrar bancos de dados Flink em catálogos
Você pode consultar este artigo sobre como usar a CLI e começar a usar o Flink SQL Client do Secure Shell no portal do Azure.
Iniciar
sql-client.sh
sessãoDefault_catalog é o catálogo na memória padrão
Vamos agora verificar o banco de dados padrão do catálogo na memória
Vamos criar o Hive Catalog da versão 3.1.2 e usá-lo
CREATE CATALOG myhive WITH ('type'='hive'); USE CATALOG myhive;
Nota
O HDInsight no AKS suporta Hive 3.1.2 e Hadoop 3.3.2. O
hive-conf-dir
está definido como localização/opt/hive-conf
Vamos criar banco de dados no catálogo hive e torná-lo padrão para a sessão (a menos que seja alterado).
Como criar e registrar tabelas do Hive no catálogo do Hive
Siga as instruções sobre Como criar e registrar bancos de dados Flink para catálogo
Vamos criar Flink Tabela do conector tipo Hive sem partição
CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
Inserir dados no hive_table
INSERT INTO hive_table SELECT 2, '10'; INSERT INTO hive_table SELECT 3, '20';
Ler dados de hive_table
Flink SQL> SELECT * FROM hive_table; 2023-07-24 09:46:22,225 INFO org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3 +----+-------------+--------------------------------+ | op | x | days | +----+-------------+--------------------------------+ | +I | 3 | 20 | | +I | 2 | 10 | | +I | 1 | 5 | +----+-------------+--------------------------------+ Received a total of 3 rows
Nota
O Hive Warehouse Directory está localizado no contêiner designado da conta de armazenamento escolhido durante a criação do cluster Apache Flink, pode ser encontrado no diretório hive/warehouse/
Vamos criar Flink Tabela de hive tipo conector com partição
CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
Importante
Há uma limitação conhecida no Apache Flink. As últimas colunas 'n' são escolhidas para partições, independentemente da coluna de partição definida pelo usuário. FLINK-32596 A chave de partição estará errada quando usar o dialeto Flink para criar a tabela Hive.
Referência
- Apache Flink Table API & SQL
- Apache, Apache Flink, Flink e nomes de projetos de código aberto associados são marcas comerciais da Apache Software Foundation (ASF).