Compartir a través de


Enriquecer los eventos de Apache Kafka® con atributos de ADLS Gen2 con Apache Flink®

Nota:

Retiraremos Azure HDInsight en AKS el 31 de enero de 2025. Antes del 31 de enero de 2025, deberá migrar las cargas de trabajo a Microsoft Fabric o un producto equivalente de Azure para evitar la terminación repentina de las cargas de trabajo. Los clústeres restantes de la suscripción se detendrán y quitarán del host.

Solo el soporte técnico básico estará disponible hasta la fecha de retirada.

Importante

Esta funcionalidad actualmente está en su versión preliminar. En Términos de uso complementarios para las versiones preliminares de Microsoft Azure encontrará más términos legales que se aplican a las características de Azure que están en versión beta, en versión preliminar, o que todavía no se han lanzado con disponibilidad general. Para más información sobre esta versión preliminar específica, consulte la Información de Azure HDInsight sobre la versión preliminar de AKS. Para plantear preguntas o sugerencias sobre la característica, envíe una solicitud en AskHDInsight con los detalles y síganos en la comunidad de Azure HDInsight para obtener más actualizaciones.

En este artículo, puede aprender a enriquecer los eventos en tiempo real mediante la unión de una secuencia de Kafka con tabla en ADLS Gen2 mediante Flink Streaming. Usamos Flink Streaming API para unir eventos de HDInsight Kafka con atributos de ADLS Gen2. Además, usamos eventos unidos a atributos para conectarse a otro tema de Kafka.

Requisitos previos

Preparación de temas de Kafka

Estamos creando un tema denominado user_events.

  • El propósito es leer una secuencia de eventos en tiempo real de un tema de Kafka mediante Flink. Tenemos todos los eventos con los siguientes campos:
    user_id,
    item_id, 
    type, 
    timestamp, 
    

Kafka 3.2.0

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events --bootstrap-server wn0-contsk:9092
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events_output --bootstrap-server wn0-contsk:9092

Preparación del archivo en ADLS Gen2

Estamos creando un archivo denominado item attributes en nuestro almacenamiento

  • El propósito es leer un lote de item attributes de un archivo en ADLS Gen2. Cada elemento tiene los siguientes campos:
    item_id, 
    brand, 
    category, 
    timestamp, 
    

Captura de pantalla que muestra preparar un archivo de atributos de elemento por lotes en ADLS Gen2.

En este paso realizamos las siguientes actividades

  • Enriquecer el tema de user_events de Kafka mediante la combinación con item attributes desde un archivo en ADLS Gen2.
  • Insertamos el resultado de este paso, como una actividad de usuario enriquecida de eventos en un tema de Kafka.

Desarrollo de un proyecto Maven

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkKafkaJoinGen2</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>3.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Unir el tema de Kafka y ADLS Gen2 File

KafkaJoinGen2Demo.java

package contoso.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;

public class KafkaJoinGen2Demo {
    public static void main(String[] args) throws Exception {
        // 1. Set up the stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka source configuration, update with your broker IPs
        String brokers = "<broker-ip>:9092,<broker-ip>:9092,<broker-ip>:9092";
        String inputTopic = "user_events";
        String outputTopic = "user_events_output";
        String groupId = "my_group";

        // 2. Register the cached file, update your container name and storage name
        env.registerCachedFile("abfs://<container-name>@<storagename>.dfs.core.windows.net/flink/data/item.txt", "file1");

        // 3. Read a stream of real-time user behavior event from a Kafka topic
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics(inputTopic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> kafkaData = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // Parse Kafka source data
      DataStream<Tuple4<String, String, String, String>> userEvents = kafkaData.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
          @Override
          public Tuple4<String, String, String, String> map(String value) throws Exception {
              // Parse the line into a Tuple4
              String[] parts = value.split(",");
              if (parts.length < 4) {
                  // Log and skip malformed record
                  System.out.println("Malformed record: " + value);
                  return null;
              }
              return new Tuple4<>(parts[0], parts[1], parts[2], parts[3]);
           }
       });

        // 4. Enrich the user activity events by joining the items' attributes from a file
        DataStream<Tuple7<String,String,String,String,String,String,String>> enrichedData = userEvents.map(new MyJoinFunction());

        // 5. Output the enriched user activity events to a Kafka topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(outputTopic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .build();

        enrichedData.map(value -> value.toString()).sinkTo(sink);

        // 6. Execute the Flink job
        env.execute("Kafka Join Batch gen2 file, sink to another Kafka Topic");
    }

    private static class MyJoinFunction extends RichMapFunction<Tuple4<String,String,String,String>, Tuple7<String,String,String,String,String,String,String>> {
        private Map<String, Tuple4<String, String, String, String>> itemAttributes;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            // Read the cached file and parse its contents into a map
            itemAttributes = new HashMap<>();
            try (BufferedReader reader = new BufferedReader(new FileReader(getRuntimeContext().getDistributedCache().getFile("file1")))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    String[] parts = line.split(",");
                    itemAttributes.put(parts[0], new Tuple4<>(parts[0], parts[1], parts[2], parts[3]));
                }
            }
        }

        @Override
        public Tuple7<String,String,String,String,String,String,String> map(Tuple4<String,String,String,String> value) throws Exception {
            Tuple4<String, String, String, String> broadcastValue = itemAttributes.get(value.f1);
            if (broadcastValue != null) {
                return Tuple7.of(value.f0,value.f1,value.f2,value.f3,broadcastValue.f1,broadcastValue.f2,broadcastValue.f3);
            } else {
                return null;
            }
        }
    }
}

Estamos enviando el archivo jar empaquetado a Flink:

Captura de pantalla que muestra el empaquetado del jar y el envío a Flink con Kafka 3.2.

Captura de pantalla que muestra empaquetado del archivo jar y envío a Flink como paso posterior con Kafka 3.2.

Crear un tema de user_events en tiempo real en Kafka

Podemos generar eventos de comportamiento de usuario en tiempo real user_events en Kafka.

Captura de pantalla que muestra un evento de comportamiento del usuario en tiempo real en Kafka 3.2.

Consumir la itemAttributes unirse al user_events en Kafka

Ahora usamos itemAttributes en eventos de actividad de usuario de unión al sistema de archivos user_events.

Captura de pantalla en la que se muestran los eventos de actividad de usuario unidos a atributos de elemento en Kafka 3.2.

Seguimos produciendo y consumemos los atributos de elemento y actividad de usuario en las siguientes imágenes

Captura de pantalla que muestra cómo seguimos produciendo un evento de comportamiento de usuario en tiempo real en Kafka 3.2.

Captura de pantalla que muestra cómo seguimos usando los eventos de actividad de usuario unidos a atributos de elemento en Kafka.

Referencia