Apache Flink®를 사용하여 ADLS Gen2의 특성을 사용하여 Apache Kafka®의 이벤트 보강
중요하다
AKS의 Azure HDInsight는 2025년 1월 31일에 사용 중지되었습니다. 에 대해공지를 통해 더 자세히 알아보세요.
워크로드가 갑자기 종료되는 것을 방지하기 위해 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 워크로드를 마이그레이션해야 합니다.
중요하다
이 기능은 현재 미리 보기로 제공됩니다. Microsoft Azure Preview에 대한 추가 사용 약관은 베타, 미리 보기 또는 아직 일반 공급으로 릴리스되지 않은 Azure 기능에 적용되는 더 많은 법적 조건을 포함할 있습니다. 이 특정 미리 보기에 대한 정보는 Azure HDInsight on AKS 미리 보기 정보을 참조하세요. 질문이나 기능 제안이 있으신 경우, AskHDInsight에 요청을 제출해 주시고, 더 많은 업데이트를 원하시면 Azure HDInsight Community를 팔로우해 주세요.
이 문서에서는 Flink Streaming을 사용하여 Kafka의 스트림을 ADLS Gen2의 테이블과 조인하여 실시간 이벤트를 보강하는 방법을 알아볼 수 있습니다. Flink Streaming API를 사용하여 HDInsight Kafka의 이벤트를 ADLS Gen2의 특성과 조인합니다. 또한 특성 조인 이벤트를 사용하여 다른 Kafka 토픽으로 싱크합니다.
필수 구성 요소
- AKS HDInsight의 Flink 클러스터
-
HDInsight의 Kafka 클러스터
- HDInsight 클러스터와 AKS의 HDInsight가 동일한 VNet에 존재하도록 하려면 'Kafka를 이용한 HDInsight 사용법'에 설명된 대로 네트워크 설정을 주의 깊게 처리해야 합니다.
- 이 데모에서는 AKS의 HDInsight와 동일한 VNet에서 Maven 프로젝트 개발 환경으로 Window VM을 사용합니다.
Kafka 항목 준비
user_events
라는 토픽을 만들고 있습니다.
- Flink를 사용하여 Kafka 토픽에서 실시간 이벤트 스트림을 읽는 것이 목적입니다. 다음과 같은 필드가 있는 모든 이벤트가 있습니다.
user_id, item_id, type, timestamp,
Kafka 3.2.0
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events --bootstrap-server wn0-contsk:9092
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events_output --bootstrap-server wn0-contsk:9092
ADLS Gen2에서 파일 준비
스토리지에 item attributes
이라는 파일을 만들고 있습니다.
- ADLS Gen2의 파일에서
item attributes
일괄 데이터를 읽는 것이 목적입니다. 각 항목에는 다음 필드가 있습니다.item_id, brand, category, timestamp,
ADLS Gen2에서 일괄 처리 항목 특성 파일 준비를 보여 주는
Apache Flink 작업 개발
이 단계에서는 다음 작업을 수행합니다.
- ADLS Gen2의 파일에서
item attributes
을 가져와 Kafka의user_events
주제를 보강합니다. - 이 단계의 결과를 강화된 사용자 활동 이벤트로 Kafka 토픽에 전송합니다.
Maven 프로젝트 개발
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>FlinkKafkaJoinGen2</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<kafka.version>3.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Kafka 토픽 을(를) ADLS Gen2 파일과 연결하기
KafkaJoinGen2Demo.java
package contoso.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;
public class KafkaJoinGen2Demo {
public static void main(String[] args) throws Exception {
// 1. Set up the stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka source configuration, update with your broker IPs
String brokers = "<broker-ip>:9092,<broker-ip>:9092,<broker-ip>:9092";
String inputTopic = "user_events";
String outputTopic = "user_events_output";
String groupId = "my_group";
// 2. Register the cached file, update your container name and storage name
env.registerCachedFile("abfs://<container-name>@<storagename>.dfs.core.windows.net/flink/data/item.txt", "file1");
// 3. Read a stream of real-time user behavior event from a Kafka topic
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics(inputTopic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafkaData = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Parse Kafka source data
DataStream<Tuple4<String, String, String, String>> userEvents = kafkaData.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
@Override
public Tuple4<String, String, String, String> map(String value) throws Exception {
// Parse the line into a Tuple4
String[] parts = value.split(",");
if (parts.length < 4) {
// Log and skip malformed record
System.out.println("Malformed record: " + value);
return null;
}
return new Tuple4<>(parts[0], parts[1], parts[2], parts[3]);
}
});
// 4. Enrich the user activity events by joining the items' attributes from a file
DataStream<Tuple7<String,String,String,String,String,String,String>> enrichedData = userEvents.map(new MyJoinFunction());
// 5. Output the enriched user activity events to a Kafka topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(outputTopic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build();
enrichedData.map(value -> value.toString()).sinkTo(sink);
// 6. Execute the Flink job
env.execute("Kafka Join Batch gen2 file, sink to another Kafka Topic");
}
private static class MyJoinFunction extends RichMapFunction<Tuple4<String,String,String,String>, Tuple7<String,String,String,String,String,String,String>> {
private Map<String, Tuple4<String, String, String, String>> itemAttributes;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Read the cached file and parse its contents into a map
itemAttributes = new HashMap<>();
try (BufferedReader reader = new BufferedReader(new FileReader(getRuntimeContext().getDistributedCache().getFile("file1")))) {
String line;
while ((line = reader.readLine()) != null) {
String[] parts = line.split(",");
itemAttributes.put(parts[0], new Tuple4<>(parts[0], parts[1], parts[2], parts[3]));
}
}
}
@Override
public Tuple7<String,String,String,String,String,String,String> map(Tuple4<String,String,String,String> value) throws Exception {
Tuple4<String, String, String, String> broadcastValue = itemAttributes.get(value.f1);
if (broadcastValue != null) {
return Tuple7.of(value.f0,value.f1,value.f2,value.f3,broadcastValue.f1,broadcastValue.f2,broadcastValue.f3);
} else {
return null;
}
}
}
}
패키지 jar를 준비하여 Apache Flink에 업로드
패키지된 jar를 Flink에 제출하고 있습니다.
jar를 패키징하고 Kafka 3.2를 사용하여 Flink에 제출하는 과정을 보여주는 스크린샷입니다.
Kafka에서 실시간 user_events
항목 생성
Kafka에서 실시간 사용자 동작 이벤트 user_events
생성할 수 있습니다.
Kafka에서 itemAttributes
을 user_events
과 조인된 상태로 소비하기
이제 파일 시스템 조인 사용자 활동 이벤트 user_events
itemAttributes
사용하고 있습니다.
다음 이미지에서 사용자 활동 및 항목 특성을 계속 생성하고 사용합니다.
참조
- Flink의 예제
- Apache Flink 웹 사이트
- Apache, Apache Kafka, Kafka, Apache Flink, Flink 및 관련 오픈 소스 프로젝트 이름은 Apache Software Foundation(ASF)의 상표입니다.