Flink/Delta Connector를 사용하는 방법
중요하다
AKS의 Azure HDInsight는 2025년 1월 31일에 사용 중지되었습니다. 이 공지 와 함께에 대해 자세히 알아보세요.
워크로드가 갑자기 종료되는 것을 방지하기 위해 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 워크로드를 마이그레이션해야 합니다.
중요하다
이 기능은 현재 미리 보기로 제공됩니다. Microsoft Azure 프리뷰에 대한 추가 사용 약관은 베타, 프리뷰 또는 아직 일반 사용으로 릴리스되지 않은 Azure 기능에 적용되는 추가적인 법적 조건을 포함할 있습니다. 이 특정 미리 보기에 대한 정보는 Azure HDInsight on AKS 미리 보기 정보을 참조하세요. 질문이나 기능 제안을 하시려면, 자세한 내용을 포함하여 AskHDInsight에 요청을 제출해 주시고, Azure HDInsight Community를 팔로우하여 더 많은 업데이트를 받아 보세요.
Apache Flink와 Delta Lake를 함께 사용하면 안정적이고 확장 가능한 데이터 레이크하우스 아키텍처를 만들 수 있습니다. Flink/Delta Connector를 사용하면 ACID 트랜잭션을 사용하여 정확히 한 번 처리하는 델타 테이블에 데이터를 쓸 수 있습니다. 즉, 검사점에서 Flink 파이프라인을 다시 시작하는 경우에도 데이터 스트림이 일관되고 오류가 없습니다. Flink/Delta Connector는 데이터가 손실되거나 중복되지 않고 Flink 의미 체계와 일치하도록 합니다.
이 문서에서는 Flink-Delta 커넥터를 사용하는 방법을 알아봅니다.
- 델타 테이블에서 데이터를 읽습니다.
- 델타 테이블에 데이터를 씁니다.
- Power BI에서 쿼리합니다.
Flink/Delta 커넥터란?
Flink/Delta Connector는 Apache Flink 애플리케이션에서 델타 독립 실행형 JVM 라이브러리를 활용하는 델타 테이블로 데이터를 읽고 쓰는 JVM 라이브러리입니다. 커넥터는 정확히 한 번 전달 보장을 제공합니다.
Flink/Delta Connector에는 다음이 포함됩니다.
Apache Flink에서 델타 테이블에 데이터를 쓰기 위한 DeltaSink입니다. Apache Flink를 사용하여 델타 테이블을 읽기 위한 DeltaSource입니다.
Apache Flink-Delta 커넥터에는 다음이 포함됩니다.
커넥터 버전에 따라 다음 Apache Flink 버전에서 사용할 수 있습니다.
Connector's version Flink's version
0.4.x (Sink Only) 1.12.0 <= X <= 1.14.5
0.5.0 1.13.0 <= X <= 1.13.6
0.6.0 X >= 1.15.3
0.7.0 X >= 1.16.1 --- We use this in Flink 1.17.0
필수 구성 요소
- AKS의 HDInsight Flink 1.17.0 클러스터
- Flink-Delta 커넥터 0.7.0
- MSI를 사용하여 ADLS Gen2 액세스
- 개발을 위한 IntelliJ
델타 테이블에서 데이터를 읽습니다.
델타 원본은 다음과 같은 두 가지 모드 중 하나에서 작동할 수 있습니다.
특정 테이블 버전에 대해서만 델타 테이블의 콘텐츠를 읽으려는 일괄 처리 작업에 적합한 경계 모드입니다. DeltaSource.forBoundedRowData API를 사용하여 이 모드의 원본을 만듭니다.
연속 모드 스트리밍 작업에 적합하며, 델타 테이블에서 새 변경 내용 및 버전을 지속적으로 확인하려고 합니다. DeltaSource.forContinuousRowData API를 사용하여 이 모드의 원본을 만듭니다.
예: 경계 모드에서 모든 열을 읽기 위한 델타 테이블에 대한 원본 생성입니다. 일괄 처리 작업에 적합합니다. 이 예제에서는 최신 테이블 버전을 로드합니다.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
델타 싱크에 쓰기
델타 싱크는 현재 다음 Flink 메트릭을 노출합니다.
분할되지 않은 테이블에 대한 싱크 만들기
이 예제에서는 DeltaSink를 만들고 기존 org.apache.flink.streaming.api.datastream.DataStream
연결하는 방법을 보여 줍니다.
import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
전체 코드
델타 테이블에서 데이터를 읽고 다른 델타 테이블로 싱크합니다.
package contoso.example;
import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
public class DeltaSourceExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
// Execute the Flink job
env.execute("Delta datasource and sink Example");
}
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
}
Maven Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>FlinkDeltaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop-version>3.3.4</hadoop-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-flink</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
jar을 패키지하고 Flink 클러스터에 제출하여 실행
AppMode 클러스터에서 작업 JAR 파일 정보를 전달합니다.
메모
ADLS를 읽고 쓰는 동안 항상
hadoop.classpath.enable
사용하도록 설정합니다.클러스터를 제출합니다. Flink UI에서 작업을 볼 수 있어야 합니다.
ADLS에서 결과를 찾습니다.
Power BI 통합
데이터가 델타 싱크에 있으면 Power BI 데스크톱에서 쿼리를 실행하고 보고서를 만들 수 있습니다.
Power BI 데스크톱을 열어 ADLS Gen2 커넥터를 사용하여 데이터를 가져옵니다.
스토리지 계정의 URL입니다.
원본에 대한 M 쿼리를 만들고 스토리지 계정에서 데이터를 쿼리하는 함수를 호출합니다.
데이터를 쉽게 사용할 수 있게 되면 보고서를 만들 수 있습니다.
참조
- Apache, Apache Flink, Flink 및 관련 오픈 소스 프로젝트 이름은 asF(Apache Software Foundation)의 상표입니다.