Partager via


Enrichir les événements d’Apache Kafka® avec des attributs d’ADLS Gen2 avec Apache Flink®

Important

Azure HDInsight sur AKS a été mis hors service le 31 janvier 2025. En savoir plus avec cette annonce.

Vous devez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent pour éviter l’arrêt brusque de vos charges de travail.

Important

Cette fonctionnalité est actuellement en préversion. Les Conditions d’utilisation supplémentaires pour les préversions Microsoft Azure incluent des termes juridiques supplémentaires qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou qui ne sont pas encore publiées en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez informations sur Azure HDInsight sur AKS en préversion. Pour des questions ou des suggestions de fonctionnalités, envoyez une demande sur AskHDInsight avec les détails et suivez-nous pour plus de mises à jour sur Communauté Azure HDInsight.

Dans cet article, vous pouvez découvrir comment enrichir les événements en temps réel en joignant un flux à partir de Kafka avec une table sur ADLS Gen2 à l’aide de Flink Streaming. Nous utilisons l’API Flink Streaming pour joindre des événements de HDInsight Kafka avec des attributs d’ADLS Gen2. En outre, nous utilisons des événements reliés par des attributs pour transférer vers un autre sujet Kafka.

Conditions préalables

  • cluster Flink sur HDInsight sur AKS
  • cluster Kafka sur HDInsight
    • Vérifiez que les paramètres réseau sont pris en charge comme décrit dans Utilisation de Kafka sur HDInsight pour vous assurer que HDInsight sur les clusters AKS et HDInsight se trouvent dans le même réseau virtuel
  • Pour cette démonstration, nous utilisons une machine virtuelle Window comme environnement de développement de projet maven dans le même réseau virtuel que HDInsight sur AKS

Préparation des rubriques Kafka

Nous créons une rubrique appelée user_events.

  • L’objectif est de lire un flux d’événements en temps réel à partir d’une rubrique Kafka à l’aide de Flink. Nous avons tous les événements avec les champs suivants :
    user_id,
    item_id, 
    type, 
    timestamp, 
    

Kafka 3.2.0

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events --bootstrap-server wn0-contsk:9092
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events_output --bootstrap-server wn0-contsk:9092

Préparer le fichier sur ADLS Gen2

Nous créons un fichier appelé item attributes dans notre stockage

  • L’objectif est de lire un lot de item attributes à partir d’un fichier sur ADLS Gen2. Chaque élément a les champs suivants :
    item_id, 
    brand, 
    category, 
    timestamp, 
    

Capture d’écran montrant la préparation d’un fichier d’attributs d’éléments de lot sur ADLS Gen2.

Dans cette étape, nous effectuons les activités suivantes

  • Enrichissez la rubrique user_events à partir de Kafka en vous joignant à item attributes à partir d’un fichier sur ADLS Gen2.
  • Nous transférons le résultat de cette étape, sous forme d'activité utilisateur enrichie d'événements, dans un topic Kafka.

Développer un projet Maven

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkKafkaJoinGen2</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>3.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Associer le sujet Kafka avec le fichier ADLS Gen2

KafkaJoinGen2Demo.java

package contoso.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;

public class KafkaJoinGen2Demo {
    public static void main(String[] args) throws Exception {
        // 1. Set up the stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka source configuration, update with your broker IPs
        String brokers = "<broker-ip>:9092,<broker-ip>:9092,<broker-ip>:9092";
        String inputTopic = "user_events";
        String outputTopic = "user_events_output";
        String groupId = "my_group";

        // 2. Register the cached file, update your container name and storage name
        env.registerCachedFile("abfs://<container-name>@<storagename>.dfs.core.windows.net/flink/data/item.txt", "file1");

        // 3. Read a stream of real-time user behavior event from a Kafka topic
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics(inputTopic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> kafkaData = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // Parse Kafka source data
      DataStream<Tuple4<String, String, String, String>> userEvents = kafkaData.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
          @Override
          public Tuple4<String, String, String, String> map(String value) throws Exception {
              // Parse the line into a Tuple4
              String[] parts = value.split(",");
              if (parts.length < 4) {
                  // Log and skip malformed record
                  System.out.println("Malformed record: " + value);
                  return null;
              }
              return new Tuple4<>(parts[0], parts[1], parts[2], parts[3]);
           }
       });

        // 4. Enrich the user activity events by joining the items' attributes from a file
        DataStream<Tuple7<String,String,String,String,String,String,String>> enrichedData = userEvents.map(new MyJoinFunction());

        // 5. Output the enriched user activity events to a Kafka topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(outputTopic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .build();

        enrichedData.map(value -> value.toString()).sinkTo(sink);

        // 6. Execute the Flink job
        env.execute("Kafka Join Batch gen2 file, sink to another Kafka Topic");
    }

    private static class MyJoinFunction extends RichMapFunction<Tuple4<String,String,String,String>, Tuple7<String,String,String,String,String,String,String>> {
        private Map<String, Tuple4<String, String, String, String>> itemAttributes;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            // Read the cached file and parse its contents into a map
            itemAttributes = new HashMap<>();
            try (BufferedReader reader = new BufferedReader(new FileReader(getRuntimeContext().getDistributedCache().getFile("file1")))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    String[] parts = line.split(",");
                    itemAttributes.put(parts[0], new Tuple4<>(parts[0], parts[1], parts[2], parts[3]));
                }
            }
        }

        @Override
        public Tuple7<String,String,String,String,String,String,String> map(Tuple4<String,String,String,String> value) throws Exception {
            Tuple4<String, String, String, String> broadcastValue = itemAttributes.get(value.f1);
            if (broadcastValue != null) {
                return Tuple7.of(value.f0,value.f1,value.f2,value.f3,broadcastValue.f1,broadcastValue.f2,broadcastValue.f3);
            } else {
                return null;
            }
        }
    }
}

Nous envoyons le fichier jar empaqueté à Flink :

Capture d’écran montrant l’empaquetage du fichier jar et envoyer à Flink avec Kafka 3.2.

Capture d’écran montrant l’empaquetage du fichier jar et envoyer à Flink comme étape ultérieure avec Kafka 3.2.

Produire un sujet user_events en temps réel sur Kafka

Nous pouvons produire un événement de comportement utilisateur en temps réel user_events dans Kafka.

Capture d’écran montrant un événement de comportement utilisateur en temps réel sur Kafka 3.2.

Utiliser la jonction itemAttributes avec user_events sur Kafka

Nous utilisons maintenant itemAttributes sur les événements d’activité utilisateur de jointure du système de fichiers user_events.

Capture d’écran montrant la consommation des événements d'activité utilisateur associés aux attributs d'éléments sur Kafka 3.2.

Nous continuons à produire et à consommer l'activité des utilisateurs et les attributs des éléments dans les images suivantes.

Capture d’écran montrant comment nous continuons à produire un événement de comportement utilisateur en temps réel sur Kafka 3.2.

Capture d’écran montrant comment nous continuons à consommer les événements d’activité utilisateur joints aux attributs d’élément sur Kafka.

Référence