Udostępnij za pośrednictwem


Wzbogacanie zdarzeń z platformy Apache Kafka® za pomocą atrybutów usługi ADLS Gen2 za pomocą narzędzia Apache Flink®

Ważny

Usługa Azure HDInsight w usłudze AKS została wycofana 31 stycznia 2025 r. Dowiedz się więcej z tego ogłoszenia.

Aby uniknąć nagłego kończenia obciążeń, należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure.

Ważny

Ta funkcja jest obecnie dostępna w wersji zapoznawczej. Dodatkowe warunki użytkowania platformy Microsoft Azure zawierają więcej warunków prawnych, które dotyczą funkcji platformy Azure w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej Azure HDInsight na AKS, zobacz informacje o wersji zapoznawczej. W przypadku pytań lub sugestii dotyczących funkcji, prosimy o przesłanie zapytania na AskHDInsight z podanymi szczegółami oraz śledzenie nas po więcej aktualności w Społeczności Azure HDInsight.

W tym artykule dowiesz się, jak można wzbogacić zdarzenia w czasie rzeczywistym, łącząc strumień z platformy Kafka z tabelą znajdującą się w usłudze ADLS Gen2 przy użyciu przesyłania strumieniowego Flink. Używamy Flink Streaming API do łączenia zdarzeń z HDInsight Kafka z atrybutami z ADLS Gen2. Ponadto do ujścia do innego tematu platformy Kafka używamy zdarzeń przyłączonych do atrybutów.

Warunki wstępne

Przygotowanie tematu platformy Kafka

Tworzymy temat o nazwie user_events.

  • Celem jest odczytywanie strumienia zdarzeń w czasie rzeczywistym z tematu platformy Kafka przy użyciu funkcji Flink. Każde zdarzenie ma następujące pola:
    user_id,
    item_id, 
    type, 
    timestamp, 
    

Kafka 3.2.0

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events --bootstrap-server wn0-contsk:9092
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events_output --bootstrap-server wn0-contsk:9092

Przygotowywanie pliku w usłudze ADLS Gen2

Tworzymy plik o nazwie item attributes w magazynie

  • Celem jest odczytanie partii item attributes z pliku w usłudze ADLS Gen2. Każdy element ma następujące pola:
    item_id, 
    brand, 
    category, 
    timestamp, 
    

Zrzut ekranu przedstawiający przygotowanie pliku atrybutów elementu wsadowego w usłudze ADLS Gen2.

W tym kroku wykonamy następujące działania

  • Wzbogać temat user_events z platformy Kafka poprzez połączenie z item attributes z pliku w usłudze ADLS Gen2.
  • Wypychamy wynik tego kroku jako wzbogaconą aktywność użytkownika dotyczącą zdarzeń do tematu w systemie Kafka.

Opracowywanie projektu Maven

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkKafkaJoinGen2</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>3.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Dołącz do tematu Kafka z plikiem ADLS Gen2

KafkaJoinGen2Demo.java

package contoso.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;

public class KafkaJoinGen2Demo {
    public static void main(String[] args) throws Exception {
        // 1. Set up the stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka source configuration, update with your broker IPs
        String brokers = "<broker-ip>:9092,<broker-ip>:9092,<broker-ip>:9092";
        String inputTopic = "user_events";
        String outputTopic = "user_events_output";
        String groupId = "my_group";

        // 2. Register the cached file, update your container name and storage name
        env.registerCachedFile("abfs://<container-name>@<storagename>.dfs.core.windows.net/flink/data/item.txt", "file1");

        // 3. Read a stream of real-time user behavior event from a Kafka topic
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics(inputTopic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> kafkaData = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // Parse Kafka source data
      DataStream<Tuple4<String, String, String, String>> userEvents = kafkaData.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
          @Override
          public Tuple4<String, String, String, String> map(String value) throws Exception {
              // Parse the line into a Tuple4
              String[] parts = value.split(",");
              if (parts.length < 4) {
                  // Log and skip malformed record
                  System.out.println("Malformed record: " + value);
                  return null;
              }
              return new Tuple4<>(parts[0], parts[1], parts[2], parts[3]);
           }
       });

        // 4. Enrich the user activity events by joining the items' attributes from a file
        DataStream<Tuple7<String,String,String,String,String,String,String>> enrichedData = userEvents.map(new MyJoinFunction());

        // 5. Output the enriched user activity events to a Kafka topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(outputTopic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .build();

        enrichedData.map(value -> value.toString()).sinkTo(sink);

        // 6. Execute the Flink job
        env.execute("Kafka Join Batch gen2 file, sink to another Kafka Topic");
    }

    private static class MyJoinFunction extends RichMapFunction<Tuple4<String,String,String,String>, Tuple7<String,String,String,String,String,String,String>> {
        private Map<String, Tuple4<String, String, String, String>> itemAttributes;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            // Read the cached file and parse its contents into a map
            itemAttributes = new HashMap<>();
            try (BufferedReader reader = new BufferedReader(new FileReader(getRuntimeContext().getDistributedCache().getFile("file1")))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    String[] parts = line.split(",");
                    itemAttributes.put(parts[0], new Tuple4<>(parts[0], parts[1], parts[2], parts[3]));
                }
            }
        }

        @Override
        public Tuple7<String,String,String,String,String,String,String> map(Tuple4<String,String,String,String> value) throws Exception {
            Tuple4<String, String, String, String> broadcastValue = itemAttributes.get(value.f1);
            if (broadcastValue != null) {
                return Tuple7.of(value.f0,value.f1,value.f2,value.f3,broadcastValue.f1,broadcastValue.f2,broadcastValue.f3);
            } else {
                return null;
            }
        }
    }
}

Przesyłamy spakowany plik jar do Flink:

zrzut ekranu przedstawiający pakowanie pliku jar i przesyłanie do Flinka za pomocą platformy Kafka 3.2.

Zrzut ekranu przedstawiający tworzenie pakietów pliku JAR i przesyłanie do Flink jako kolejny krok z użyciem Kafki 3.2.

Tworzenie tematu user_events w czasie rzeczywistym na platformie Kafka

Możemy utworzyć zdarzenie zachowania użytkownika w czasie rzeczywistym user_events na platformie Kafka.

Zrzut ekranu przedstawiający zdarzenie zachowania użytkownika w czasie rzeczywistym na platformie Kafka 3.2.

Konsumowanie itemAttributes połączonych z user_events w Kafce

Teraz używamy itemAttributes do zdarzeń aktywności użytkownika związanych z łączeniem systemu plików user_events.

Zrzut ekranu przedstawiający zdarzenia aktywności użytkownika przyłączone do atrybutów elementu na platformie Kafka 3.2.

Kontynuujemy tworzenie i używanie atrybutów działań użytkownika i elementów na poniższych obrazach

Zrzut ekranu pokazujący, jak kontynuujemy generowanie zdarzenia zachowania użytkownika w czasie rzeczywistym w systemie Kafka 3.2.

Zrzut ekranu przedstawiający sposób dalszego korzystania ze zdarzeń aktywności użytkownika dołączonych do atrybutów elementu na platformie Kafka.

Odniesienie