다음을 통해 공유


Apache Flink®를 사용하여 ADLS Gen2의 특성을 사용하여 Apache Kafka®의 이벤트 보강

중요하다

AKS의 Azure HDInsight는 2025년 1월 31일에 사용 중지되었습니다. 에 대해공지를 통해 더 자세히 알아보세요.

워크로드가 갑자기 종료되는 것을 방지하기 위해 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 워크로드를 마이그레이션해야 합니다.

중요하다

이 기능은 현재 미리 보기로 제공됩니다. Microsoft Azure Preview에 대한 추가 사용 약관은 베타, 미리 보기 또는 아직 일반 공급으로 릴리스되지 않은 Azure 기능에 적용되는 더 많은 법적 조건을 포함할 있습니다. 이 특정 미리 보기에 대한 정보는 Azure HDInsight on AKS 미리 보기 정보을 참조하세요. 질문이나 기능 제안이 있으신 경우, AskHDInsight에 요청을 제출해 주시고, 더 많은 업데이트를 원하시면 Azure HDInsight Community를 팔로우해 주세요.

이 문서에서는 Flink Streaming을 사용하여 Kafka의 스트림을 ADLS Gen2의 테이블과 조인하여 실시간 이벤트를 보강하는 방법을 알아볼 수 있습니다. Flink Streaming API를 사용하여 HDInsight Kafka의 이벤트를 ADLS Gen2의 특성과 조인합니다. 또한 특성 조인 이벤트를 사용하여 다른 Kafka 토픽으로 싱크합니다.

필수 구성 요소

Kafka 항목 준비

user_events라는 토픽을 만들고 있습니다.

  • Flink를 사용하여 Kafka 토픽에서 실시간 이벤트 스트림을 읽는 것이 목적입니다. 다음과 같은 필드가 있는 모든 이벤트가 있습니다.
    user_id,
    item_id, 
    type, 
    timestamp, 
    

Kafka 3.2.0

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events --bootstrap-server wn0-contsk:9092
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events_output --bootstrap-server wn0-contsk:9092

ADLS Gen2에서 파일 준비

스토리지에 item attributes이라는 파일을 만들고 있습니다.

  • ADLS Gen2의 파일에서 item attributes 일괄 데이터를 읽는 것이 목적입니다. 각 항목에는 다음 필드가 있습니다.
    item_id, 
    brand, 
    category, 
    timestamp, 
    

ADLS Gen2에서 일괄 처리 항목 특성 파일 준비를 보여 주는 스크린샷

이 단계에서는 다음 작업을 수행합니다.

  • ADLS Gen2의 파일에서 item attributes을 가져와 Kafka의 user_events 주제를 보강합니다.
  • 이 단계의 결과를 강화된 사용자 활동 이벤트로 Kafka 토픽에 전송합니다.

Maven 프로젝트 개발

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkKafkaJoinGen2</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>3.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Kafka 토픽 을(를) ADLS Gen2 파일과 연결하기

KafkaJoinGen2Demo.java

package contoso.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;

public class KafkaJoinGen2Demo {
    public static void main(String[] args) throws Exception {
        // 1. Set up the stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka source configuration, update with your broker IPs
        String brokers = "<broker-ip>:9092,<broker-ip>:9092,<broker-ip>:9092";
        String inputTopic = "user_events";
        String outputTopic = "user_events_output";
        String groupId = "my_group";

        // 2. Register the cached file, update your container name and storage name
        env.registerCachedFile("abfs://<container-name>@<storagename>.dfs.core.windows.net/flink/data/item.txt", "file1");

        // 3. Read a stream of real-time user behavior event from a Kafka topic
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics(inputTopic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> kafkaData = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // Parse Kafka source data
      DataStream<Tuple4<String, String, String, String>> userEvents = kafkaData.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
          @Override
          public Tuple4<String, String, String, String> map(String value) throws Exception {
              // Parse the line into a Tuple4
              String[] parts = value.split(",");
              if (parts.length < 4) {
                  // Log and skip malformed record
                  System.out.println("Malformed record: " + value);
                  return null;
              }
              return new Tuple4<>(parts[0], parts[1], parts[2], parts[3]);
           }
       });

        // 4. Enrich the user activity events by joining the items' attributes from a file
        DataStream<Tuple7<String,String,String,String,String,String,String>> enrichedData = userEvents.map(new MyJoinFunction());

        // 5. Output the enriched user activity events to a Kafka topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(outputTopic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .build();

        enrichedData.map(value -> value.toString()).sinkTo(sink);

        // 6. Execute the Flink job
        env.execute("Kafka Join Batch gen2 file, sink to another Kafka Topic");
    }

    private static class MyJoinFunction extends RichMapFunction<Tuple4<String,String,String,String>, Tuple7<String,String,String,String,String,String,String>> {
        private Map<String, Tuple4<String, String, String, String>> itemAttributes;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            // Read the cached file and parse its contents into a map
            itemAttributes = new HashMap<>();
            try (BufferedReader reader = new BufferedReader(new FileReader(getRuntimeContext().getDistributedCache().getFile("file1")))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    String[] parts = line.split(",");
                    itemAttributes.put(parts[0], new Tuple4<>(parts[0], parts[1], parts[2], parts[3]));
                }
            }
        }

        @Override
        public Tuple7<String,String,String,String,String,String,String> map(Tuple4<String,String,String,String> value) throws Exception {
            Tuple4<String, String, String, String> broadcastValue = itemAttributes.get(value.f1);
            if (broadcastValue != null) {
                return Tuple7.of(value.f0,value.f1,value.f2,value.f3,broadcastValue.f1,broadcastValue.f2,broadcastValue.f3);
            } else {
                return null;
            }
        }
    }
}

패키지된 jar를 Flink에 제출하고 있습니다.

jar를 패키징하고 Kafka 3.2를 사용하여 Flink에 제출하는 과정을 보여주는 스크린샷입니다.

Jar 패키징을 보여 주는 스크린샷 및 Kafka 3.2의 추가 단계로 Flink에 제출합니다.

Kafka에서 실시간 user_events 항목 생성

Kafka에서 실시간 사용자 동작 이벤트 user_events 생성할 수 있습니다.

Kafka 3.2의 실시간 사용자 동작 이벤트를 보여 주는 스크린샷

Kafka에서 itemAttributesuser_events과 조인된 상태로 소비하기

이제 파일 시스템 조인 사용자 활동 이벤트 user_eventsitemAttributes 사용하고 있습니다.

Kafka 3.2에서 항목 특성에 조인된 사용자 활동 이벤트를 소비하는 스크린샷

다음 이미지에서 사용자 활동 및 항목 특성을 계속 생성하고 사용합니다.

Kafka 3.2에서 실시간 사용자 동작 이벤트를 계속 생성하는 방법을 보여 주는 스크린샷

Kafka에서 항목 특성에 조인된 사용자 활동 이벤트를 계속 사용하는 방법을 보여 주는 스크린샷

참조