如何使用 Flink/Delta 连接器

重要

AKS 上的 Azure HDInsight 已于 2025 年 1 月 31 日停用。 了解此公告的详细信息

需要将工作负荷迁移到 Microsoft Fabric 或等效的 Azure 产品,以避免工作负荷突然终止。

重要

此功能目前以预览版提供。 Microsoft Azure 预览版补充使用条款 包括适用于 beta 版、预览版或尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览版的信息,请参阅 AKS 上的 Azure HDInsight 预览信息。 有关问题或功能建议,请在 AskHDInsight 上提交请求,提供详细信息,并关注 Azure HDInsight 社区以获取更多更新。

通过将 Apache Flink 和 Delta Lake 结合使用,可以创建可靠且可缩放的 Data Lakehouse 体系结构。 Flink/Delta 连接器允许通过 ACID 事务将数据写入 Delta 表,并确保精确一次的处理。 这意味着数据流一致且无错误,即使从检查点重启 Flink 管道也是如此。 Flink/Delta 连接器可确保数据不会丢失或复制,并且它与 Flink 语义匹配。

本文介绍如何使用 Flink-Delta 连接器。

  • 从 Delta 表读取数据。
  • 将数据写入增量表。
  • 在 Power BI 中查询它。

什么是 Flink/Delta 连接器

Flink/Delta 连接器是一个 JVM 库,用于使用 Delta 独立 JVM 库从 Apache Flink 应用程序读取和写入到 Delta 表的数据。 连接器提供一次性的交付保证。

Flink/Delta 连接器包括:

用于将数据从 Apache Flink 写入 Delta 表的 DeltaSink。 用于使用 Apache Flink 读取 Delta 表的 DeltaSource。

Apache Flink-Delta 连接器包括:

根据连接器的版本,可以将它用于以下 Apache Flink 版本:

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

先决条件

  • AKS 上的 HDInsight Flink 1.17.0 群集
  • Flink-Delta 连接器 0.7.0
  • 使用 MSI 访问 ADLS Gen2
  • 用于开发的 IntelliJ

从增量表读取数据

增量源可以在以下两种模式之一中工作,具体描述如下。

  • 有界模式适用于批处理作业,这种模式下我们希望仅读取特定表版本的Delta表内容。 使用 DeltaSource.forBoundedRowData API 创建此模式的源。

  • 适用于流式处理作业的连续模式,我们希望在其中持续检查 Delta 表是否有新的更改和版本。 使用 DeltaSource.forContinuousRowData API 创建此模式的源。

示例:为 Delta 表创建源,以读取绑定模式下的所有列。 适用于批处理作业。 此示例加载最新的表版本。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

写入到 Delta 接收器

Delta Sink 当前公开以下 Flink 指标:

显示 Flink 指标表的屏幕截图。

为非分区表创建接收器

在此示例中,我们演示如何创建 DeltaSink 并将其连接到现有 org.apache.flink.streaming.api.datastream.DataStream

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

完整代码

从增量表读取数据,并写入到另一个增量表。

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Maven Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. 将 jar 上传到 ABFS。 显示应用模式 jar 文件的 屏幕截图。

  2. 在应用模式集群中传递作业 jar 文件的信息。

    显示群集配置的 屏幕截图。

    注意

    始终在读取/写入 ADLS 时启用 hadoop.classpath.enable

  3. 提交集群后,您就可以在 Flink UI 中看到作业。

    显示 Flink 仪表板的屏幕截图。

  4. 在 ADLS 中查找结果。

    显示输出的 屏幕截图。

Power BI 集成

数据进入 Delta Sink 后,可以在 Power BI Desktop 中运行查询并创建报表。

  1. 打开 Power BI Desktop,使用 ADLS Gen2 连接器获取数据。

    屏幕截图显示 Power BI Desktop。

    屏幕截图显示 ADLSGen 2 连接器。

  2. 存储帐户的 URL。

    显示存储帐户 URL 的屏幕截图。

    屏幕截图显示 ADLS Gen2 详细信息。

  3. 为源创建 M 查询并调用函数,该函数从存储帐户查询数据。

  4. 数据准备就绪后,可以创建报表。

    屏幕截图显示如何创建报表。

引用