AKS 上运行 HDInsight 的 Apache Flink® 群集中的表 API 和 SQL

重要

AKS 上的 Azure HDInsight 已于 2025 年 1 月 31 日停用。 了解此公告的详细信息

需要将工作负荷迁移到 Microsoft Fabric 或等效的 Azure 产品,以避免工作负荷突然终止。

重要

此功能目前以预览版提供。 Microsoft Azure 预览版补充使用条款 包括适用于 beta 版、预览版或尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览的信息,请参阅 AKS 上的 Azure HDInsight 预览信息。 有关问题或功能建议,请在 AskHDInsight 上提交请求,并提供详细信息,关注我们以获取 Azure HDInsight 社区 的更多更新。

Apache Flink 提供两个关系 API(表 API 和 SQL),用于统一流和批处理。 表 API 是一种语言集成的查询 API,它允许使用关系运算符(如选择、过滤和连接)直观地组合查询。 Flink 的 SQL 支持基于实现 SQL 标准的 Apache Calcite。

表 API 和 SQL 接口不仅彼此无缝集成,也与 Flink 的 DataStream API 无缝集成。 可以轻松地在所有 API 和基于它们构建的库之间进行切换。

与其他 SQL 引擎一样,Flink 查询在表之上运行。 它不同于传统数据库,因为 Flink 在本地不管理静态数据;相反,其查询在外部表上持续运行。

Flink 数据处理管道以源表开头,以汇集表结尾。 源表生成在查询执行期间被处理的行;它们是查询中 FROM 子句所引用的表。 连接器可以是 HDInsight Kafka、HDInsight HBase、Azure 事件中心、数据库、文件系统或任何其他位于类路径中的系统。

请参阅本文,了解如何在 Azure 门户上使用 Secure Shell 的 CLI。 下面是有关如何入门的一些快速示例。

  • 启动 SQL 客户端

    ./bin/sql-client.sh
    
  • 把用于初始化的 sql 文件传递给 sql-client 以运行

    ./sql-client.sh -i /path/to/init_file.sql
    
  • 在 sql-client 中设置配置

    SET execution.runtime-mode = streaming;
    SET sql-client.execution.result-mode = table;
    SET sql-client.execution.max-table-result.rows = 10000;
    

SQL DDL

Flink SQL 支持以下 CREATE 语句

  • 创建表格
  • 创建数据库
  • 创建目录

下面是一个示例语法,使用 JDBC 连接器来定义用于连接 MSSQL 的源表,其中包含 id 和 name 字段作为列的 CREATE TABLE 语句。

CREATE TABLE student_information (
    id BIGINT,
    name STRING,  
    address STRING,
    grade STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
     'table-name' = 'students',
     'username' = 'username',
     'password' = 'password'
 );

CREATE DATABASE

CREATE DATABASE students;

创建目录

CREATE CATALOG myhive WITH ('type'='hive');

可以在这些表的顶部运行连续查询

  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

源表写出到 接收器表

  INSERT INTO grade_counts
  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

添加依赖项

JAR 语句用于将用户 jar 添加到 classpath 中,或者从类路径中删除用户 jar,或在运行时的 classpath 中显示添加的 jar。

Flink SQL 支持以下 JAR 语句:

  • 添加 JAR
  • 显示 JAR
  • 删除 JAR
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.

Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.

Flink SQL> SHOW JARS;
+----------------------------+
|                       jars |
+----------------------------+
|            /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+

Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.

目录提供元数据,例如数据库、表、分区、视图和函数以及访问存储在数据库或其他外部系统中的数据所需的信息。

在 AKS 上的 HDInsight 中,Flink 支持两个目录选项:

GenericInMemoryCatalog

GenericInMemoryCatalog 是一个内存实现的目录。 所有对象仅可用于 sql 会话的生存期。

HiveCatalog

HiveCatalog 有两个用途:作为纯 Flink 元数据的持久存储,以及作为读取和写入现有 Hive 元数据的接口。

注意

AKS 群集上的 HDInsight 提供了一个与 Apache Flink 兼容的 Hive Metastore 集成选项。 可以在创建群集 期间选择 Hive 元数据存储

请参阅本文,了解如何使用 CLI 并从 Azure 门户上的 Secure Shell 开始使用 Flink SQL 客户端。

  • 启动 sql-client.sh 会话

    显示默认 hive 目录的屏幕截图。

    Default_catalog是默认内存中目录

  • 现在让我们检查内存中目录的默认数据库 显示默认内存中目录的屏幕截图。

  • 让我们创建版本 3.1.2 的 Hive 目录并使用它

      CREATE CATALOG myhive WITH ('type'='hive');
      USE CATALOG myhive;
    

    注意

    AKS 上的 HDInsight 支持 Hive 3.1.2Hadoop 3.3.2hive-conf-dir 设置为位置 /opt/hive-conf

  • 让我们在 hive 目录中创建数据库,并将其设置为会话的默认值(除非已更改)。 显示如何在 hive 目录中创建数据库,并将其设为会话的默认目录的屏幕截图。

如何创建 Hive 表并将其注册到 Hive 目录

  • 请按照 如何创建和注册 Flink 数据库到目录 的说明进行操作。

  • 让我们创建一个没有分区的 Hive 类型连接器的 Flink 表

    CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    
  • 将数据插入hive_table

    INSERT INTO hive_table SELECT 2, '10';
    INSERT INTO hive_table SELECT 3, '20';
    
  • 从 hive_table 中读取数据

      Flink SQL> SELECT * FROM hive_table;
      2023-07-24 09:46:22,225 INFO  org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3
      +----+-------------+--------------------------------+
      | op |           x |                           days |
      +----+-------------+--------------------------------+
      | +I |           3 |                             20 |
      | +I |           2 |                             10 |
      | +I |           1 |                              5 |
      +----+-------------+--------------------------------+
      Received a total of 3 rows
    

    注意

    Hive Warehouse Directory 位于在创建 Apache Flink 群集期间选择的存储帐户的指定容器中,可在目录 hive/warehouse 中找到/

  • 让我们创建带有分区的 Hive 连接器类型的 Flink 表。

    CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    

重要

Apache Flink 中有一个已知的限制。 为分区选择最后的“n”列,而不考虑用户定义的分区列。 FLINK-32596 使用 Flink 方言创建 Hive 表时,分区键会出错。

参考