다음을 통해 공유


Flink/Delta Connector를 사용하는 방법

중요하다

AKS의 Azure HDInsight는 2025년 1월 31일에 사용 중지되었습니다. 이 공지 와 함께에 대해 자세히 알아보세요.

워크로드가 갑자기 종료되는 것을 방지하기 위해 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 워크로드를 마이그레이션해야 합니다.

중요하다

이 기능은 현재 미리 보기로 제공됩니다. Microsoft Azure 프리뷰에 대한 추가 사용 약관은 베타, 프리뷰 또는 아직 일반 사용으로 릴리스되지 않은 Azure 기능에 적용되는 추가적인 법적 조건을 포함할 있습니다. 이 특정 미리 보기에 대한 정보는 Azure HDInsight on AKS 미리 보기 정보을 참조하세요. 질문이나 기능 제안을 하시려면, 자세한 내용을 포함하여 AskHDInsight에 요청을 제출해 주시고, Azure HDInsight Community를 팔로우하여 더 많은 업데이트를 받아 보세요.

Apache Flink와 Delta Lake를 함께 사용하면 안정적이고 확장 가능한 데이터 레이크하우스 아키텍처를 만들 수 있습니다. Flink/Delta Connector를 사용하면 ACID 트랜잭션을 사용하여 정확히 한 번 처리하는 델타 테이블에 데이터를 쓸 수 있습니다. 즉, 검사점에서 Flink 파이프라인을 다시 시작하는 경우에도 데이터 스트림이 일관되고 오류가 없습니다. Flink/Delta Connector는 데이터가 손실되거나 중복되지 않고 Flink 의미 체계와 일치하도록 합니다.

이 문서에서는 Flink-Delta 커넥터를 사용하는 방법을 알아봅니다.

  • 델타 테이블에서 데이터를 읽습니다.
  • 델타 테이블에 데이터를 씁니다.
  • Power BI에서 쿼리합니다.

Flink/Delta 커넥터란?

Flink/Delta Connector는 Apache Flink 애플리케이션에서 델타 독립 실행형 JVM 라이브러리를 활용하는 델타 테이블로 데이터를 읽고 쓰는 JVM 라이브러리입니다. 커넥터는 정확히 한 번 전달 보장을 제공합니다.

Flink/Delta Connector에는 다음이 포함됩니다.

Apache Flink에서 델타 테이블에 데이터를 쓰기 위한 DeltaSink입니다. Apache Flink를 사용하여 델타 테이블을 읽기 위한 DeltaSource입니다.

Apache Flink-Delta 커넥터에는 다음이 포함됩니다.

커넥터 버전에 따라 다음 Apache Flink 버전에서 사용할 수 있습니다.

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

필수 구성 요소

  • AKS의 HDInsight Flink 1.17.0 클러스터
  • Flink-Delta 커넥터 0.7.0
  • MSI를 사용하여 ADLS Gen2 액세스
  • 개발을 위한 IntelliJ

델타 테이블에서 데이터를 읽습니다.

델타 원본은 다음과 같은 두 가지 모드 중 하나에서 작동할 수 있습니다.

  • 특정 테이블 버전에 대해서만 델타 테이블의 콘텐츠를 읽으려는 일괄 처리 작업에 적합한 경계 모드입니다. DeltaSource.forBoundedRowData API를 사용하여 이 모드의 원본을 만듭니다.

  • 연속 모드 스트리밍 작업에 적합하며, 델타 테이블에서 새 변경 내용 및 버전을 지속적으로 확인하려고 합니다. DeltaSource.forContinuousRowData API를 사용하여 이 모드의 원본을 만듭니다.

예: 경계 모드에서 모든 열을 읽기 위한 델타 테이블에 대한 원본 생성입니다. 일괄 처리 작업에 적합합니다. 이 예제에서는 최신 테이블 버전을 로드합니다.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

델타 싱크에 쓰기

델타 싱크는 현재 다음 Flink 메트릭을 노출합니다.

Flink 메트릭에 대한 테이블을 보여 주는 스크린샷

분할되지 않은 테이블에 대한 싱크 만들기

이 예제에서는 DeltaSink를 만들고 기존 org.apache.flink.streaming.api.datastream.DataStream연결하는 방법을 보여 줍니다.

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

전체 코드

델타 테이블에서 데이터를 읽고 다른 델타 테이블로 싱크합니다.

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Maven Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. ABFS에 jar을 업로드합니다. 앱 모드 jar 파일을 보여주는 스크린샷

  2. AppMode 클러스터에서 작업 JAR 파일 정보를 전달합니다.

    클러스터 구성을 보여 주는 스크린샷

    메모

    ADLS를 읽고 쓰는 동안 항상 hadoop.classpath.enable 사용하도록 설정합니다.

  3. 클러스터를 제출합니다. Flink UI에서 작업을 볼 수 있어야 합니다.

    Flink 대시보드를 보여 주는 스크린샷

  4. ADLS에서 결과를 찾습니다.

    출력을 보여 주는 스크린샷

Power BI 통합

데이터가 델타 싱크에 있으면 Power BI 데스크톱에서 쿼리를 실행하고 보고서를 만들 수 있습니다.

  1. Power BI 데스크톱을 열어 ADLS Gen2 커넥터를 사용하여 데이터를 가져옵니다.

    스크린샷은 Power BI 데스크톱을 보여줍니다.

    스크린샷은 ADLSGen 2 커넥터를 보여줍니다.

  2. 스토리지 계정의 URL입니다.

    스토리지 계정의 URL을 보여 주는 스크린샷

    스크린샷은 ADLS Gen2 세부 정보를 보여줍니다.

  3. 원본에 대한 M 쿼리를 만들고 스토리지 계정에서 데이터를 쿼리하는 함수를 호출합니다.

  4. 데이터를 쉽게 사용할 수 있게 되면 보고서를 만들 수 있습니다.

    스크린샷은 보고서를 만드는 방법을 보여줍니다.

참조