Table API y SQL en clústeres de Apache Flink® en HDInsight en AKS
Importante
Azure HDInsight en AKS se retiró el 31 de enero de 2025. Obtenga más información con este anuncio.
Debe migrar las cargas de trabajo a microsoft Fabric o un producto equivalente de Azure para evitar la terminación repentina de las cargas de trabajo.
Importante
Esta característica está actualmente en versión preliminar. Los Términos de uso complementarios para las versiones preliminares de Microsoft Azure incluyen más términos legales que se aplican a las características de Azure que se encuentran en versión beta, en versión preliminar o, de lo contrario, aún no se han publicado en disponibilidad general. Para obtener información sobre esta vista previa específica, consulte la información sobre Azure HDInsight en AKS . Para preguntas o sugerencias de características, envíe una solicitud en AskHDInsight con los detalles y síganos para obtener más actualizaciones sobre Comunidad de Azure HDInsight.
Apache Flink incluye dos API relacionales( Table API y SQL) para el procesamiento por lotes y flujos unificados. La Table API es una API de consultas integrada en el lenguaje que permite la composición intuitiva de consultas a partir de operadores relacionales como la selección, el filtro y la unión. La compatibilidad con SQL de Flink se basa en ApacheCalcte, que implementa el estándar SQL.
Las interfaces table API y SQL se integran perfectamente entre sí y la API DataStream de Flink. Puede cambiar fácilmente entre todas las API y bibliotecas, que se basan en ellas.
Apache Flink SQL
Al igual que otros motores de SQL, las consultas de Flink funcionan sobre tablas. Difiere de una base de datos tradicional porque Flink no administra los datos en reposo localmente; en su lugar, sus consultas funcionan continuamente sobre tablas externas.
Las canalizaciones de procesamiento de datos de Flink comienzan con tablas de origen y terminan con tablas de destino. Las tablas de origen generan filas operadas durante la ejecución de la consulta; son las tablas a las que se hace referencia en la cláusula FROM de una consulta. Los conectores pueden ser de tipo HDInsight Kafka, HDInsight HBase, Azure Event Hubs, bases de datos, sistemas de archivos o cualquier otro sistema cuyo conector se encuentra en la ruta de clase.
Uso de Flink SQL Client en HDInsight en clústeres de AKS
Puede consultar este artículo sobre cómo usar la CLI desde Secure Shell en Azure Portal. Estos son algunos ejemplos rápidos de cómo empezar.
Para iniciar el cliente SQL
./bin/sql-client.sh
Para pasar un archivo sql de inicialización para ejecutarse junto con sql-client
./sql-client.sh -i /path/to/init_file.sql
Para establecer una configuración en sql-client
SET execution.runtime-mode = streaming; SET sql-client.execution.result-mode = table; SET sql-client.execution.max-table-result.rows = 10000;
SQL DDL
Flink SQL admite las siguientes instrucciones CREATE
- CREATE TABLE
- CREAR BASE DE DATOS
- Crear catálogo
A continuación se muestra una sintaxis de ejemplo para definir una tabla de origen mediante el conector jdbc para conectarse a MSSQL, con las columnas id y nombre en una sentencia CREATE TABLE.
CREATE TABLE student_information (
id BIGINT,
name STRING,
address STRING,
grade STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
'table-name' = 'students',
'username' = 'username',
'password' = 'password'
);
CREATE DATABASE :
CREATE DATABASE students;
CREAR CATÁLOGO:
CREATE CATALOG myhive WITH ('type'='hive');
Puede ejecutar consultas continuas sobre estas tablas.
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
Escribir en la tabla de destino desde la tabla de origen :
INSERT INTO grade_counts
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
Agregar dependencias
Las instrucciones JAR se utilizan para añadir los archivos JAR de usuario al classpath, eliminar los archivos JAR de usuario del classpath o mostrar los archivos JAR añadidos en el classpath durante la ejecución.
Flink SQL admite las siguientes instrucciones JAR:
- AGREGAR JAR
- MOSTRAR ARCHIVOS JAR
- REMOVE JAR
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.
Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.
Flink SQL> SHOW JARS;
+----------------------------+
| jars |
+----------------------------+
| /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+
Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.
Metastore de Hive en clústeres de Apache Flink® en HDInsight en AKS
Los catálogos proporcionan metadatos, como bases de datos, tablas, particiones, vistas y funciones e información necesarias para acceder a los datos almacenados en una base de datos u otros sistemas externos.
En HDInsight en AKS, Flink admite dos opciones de catálogo:
genericInMemoryCatalog
El GenericInMemoryCatalog es una implementación en memoria de un catálogo. Todos los objetos solo están disponibles durante la vigencia de la sesión sql.
HiveCatalog
El hiveCatalog de sirve para dos propósitos; como almacenamiento persistente para metadatos de Flink puros y como una interfaz para leer y escribir metadatos de Hive existentes.
Nota
HDInsight en clústeres de AKS incluye una opción integrada de Hive Metastore para Apache Flink. Puede optar por Hive Metastore durante la creación del clúster
Cómo crear y registrar bases de datos de Flink en catálogos
Puede consultar este artículo sobre cómo usar la CLI y empezar a trabajar con Flink SQL Client desde Secure Shell en Azure Portal.
Iniciar la sesión
sql-client.sh
Default_catalog es el catálogo en memoria predeterminado.
Ahora vamos a comprobar la base de datos predeterminada del catálogo en memoria
Vamos a crear el catálogo de Hive de la versión 3.1.2 y usarlo
CREATE CATALOG myhive WITH ('type'='hive'); USE CATALOG myhive;
Nota
HDInsight en AKS admite Hive 3.1.2 y Hadoop 3.3.2. El
hive-conf-dir
se establece en ubicación/opt/hive-conf
Vamos a crear la Base de datos en el "catálogo de Hive" y establecerla como predeterminada para la sesión (a menos que se cambie).
Cómo crear y registrar tablas de Hive en el catálogo de Hive
Siga las instrucciones de Cómo crear y registrar bases de datos de Flink en el catálogo
Vamos a crear la tabla Flink del tipo de conector Hive sin partición
CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
Insertar datos en hive_table
INSERT INTO hive_table SELECT 2, '10'; INSERT INTO hive_table SELECT 3, '20';
Leer datos de hive_table
Flink SQL> SELECT * FROM hive_table; 2023-07-24 09:46:22,225 INFO org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3 +----+-------------+--------------------------------+ | op | x | days | +----+-------------+--------------------------------+ | +I | 3 | 20 | | +I | 2 | 10 | | +I | 1 | 5 | +----+-------------+--------------------------------+ Received a total of 3 rows
Nota
El directorio Hive Warehouse se encuentra en el contenedor designado de la cuenta de almacenamiento elegida durante la creación del clúster de Apache Flink y se puede encontrar en el directorio hive/warehouse.
Vamos a crear una tabla Flink de tipo de conector hive con partición.
CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
Importante
Hay una limitación conocida en Apache Flink. Las últimas columnas 'n' se eligen para las particiones, independientemente de la columna de partición definida por el usuario. FLINK-32596 La clave de partición será incorrecta al usar el dialecto de Flink para crear una tabla de Hive.
Referencia
- Apache Flink Table API & SQL
- Los nombres de proyecto de código abierto asociados, Apache, Apache Flink, Flink y son marcas comerciales de la Apache Software Foundation (ASF).