Enriquecer los eventos de Apache Kafka® con atributos de ADLS Gen2 con Apache Flink®
Importante
Azure HDInsight en AKS se retiró el 31 de enero de 2025. Conoce más con este anuncio.
Debe migrar las cargas de trabajo a microsoft Fabric o un producto equivalente de Azure para evitar la terminación repentina de las cargas de trabajo.
Importante
Esta característica está actualmente en versión preliminar. Los Términos de uso complementarios para las versiones preliminares de Microsoft Azure incluyen más términos legales que se aplican a las características de Azure que se encuentran en versión beta, en versión preliminar o, de lo contrario, aún no se han publicado en disponibilidad general. Para obtener información sobre esta versión preliminar específica, consulte información de la versión preliminar de Azure HDInsight en AKS. Para preguntas o sugerencias de características, envíe una solicitud en AskHDInsight con los detalles y síganos para obtener más actualizaciones sobre Comunidad de Azure HDInsight.
En este artículo, puede aprender a enriquecer los eventos en tiempo real mediante la unión de una secuencia de Kafka con una tabla en ADLS Gen2 usando Flink Streaming. Usamos Flink Streaming API para unir eventos de HDInsight Kafka con atributos de ADLS Gen2. Además, usamos eventos con atributos unidos para redirigir a otro tema de Kafka.
Prerrequisitos
- clúster de Flink en HDInsight sobre AKS
-
clúster de Kafka en HDInsight
- Asegúrese de que la configuración de red se maneje según lo descrito en Uso de Kafka en HDInsight para asegurarse de que HDInsight en AKS y los clústeres de HDInsight estén en la misma VNet.
- Para esta demostración, estamos usando una máquina virtual de Windows como entorno de desarrollo de proyectos Maven en la misma VNet que HDInsight en AKS.
Preparación de temas de Kafka
Estamos creando un tema denominado user_events
.
- El propósito es leer una secuencia de eventos en tiempo real de un tema de Kafka mediante Flink. Contamos con todos los eventos que tienen los siguientes campos:
user_id, item_id, type, timestamp,
kafka 3.2.0
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events --bootstrap-server wn0-contsk:9092
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events_output --bootstrap-server wn0-contsk:9092
Preparación del archivo en ADLS Gen2
Estamos creando un archivo denominado item attributes
en nuestro almacenamiento
- El propósito es leer un lote de
item attributes
de un archivo en ADLS Gen2. Cada elemento tiene los siguientes campos:item_id, brand, category, timestamp,
Desarrollo del trabajo de Apache Flink
En este paso, realizamos las siguientes actividades.
- Enriquecer el tema
user_events
de Kafka mediante la combinación conitem attributes
desde un archivo en ADLS Gen2. - Publicamos el resultado de este paso como una actividad de usuario enriquecida con eventos en un tópico de Kafka.
Desarrollo de un proyecto de Maven
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>FlinkKafkaJoinGen2</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<kafka.version>3.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Conectar el tópico de Kafka con el archivo de ADLS Gen2
KafkaJoinGen2Demo.java
package contoso.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;
public class KafkaJoinGen2Demo {
public static void main(String[] args) throws Exception {
// 1. Set up the stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka source configuration, update with your broker IPs
String brokers = "<broker-ip>:9092,<broker-ip>:9092,<broker-ip>:9092";
String inputTopic = "user_events";
String outputTopic = "user_events_output";
String groupId = "my_group";
// 2. Register the cached file, update your container name and storage name
env.registerCachedFile("abfs://<container-name>@<storagename>.dfs.core.windows.net/flink/data/item.txt", "file1");
// 3. Read a stream of real-time user behavior event from a Kafka topic
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics(inputTopic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafkaData = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Parse Kafka source data
DataStream<Tuple4<String, String, String, String>> userEvents = kafkaData.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
@Override
public Tuple4<String, String, String, String> map(String value) throws Exception {
// Parse the line into a Tuple4
String[] parts = value.split(",");
if (parts.length < 4) {
// Log and skip malformed record
System.out.println("Malformed record: " + value);
return null;
}
return new Tuple4<>(parts[0], parts[1], parts[2], parts[3]);
}
});
// 4. Enrich the user activity events by joining the items' attributes from a file
DataStream<Tuple7<String,String,String,String,String,String,String>> enrichedData = userEvents.map(new MyJoinFunction());
// 5. Output the enriched user activity events to a Kafka topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(outputTopic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build();
enrichedData.map(value -> value.toString()).sinkTo(sink);
// 6. Execute the Flink job
env.execute("Kafka Join Batch gen2 file, sink to another Kafka Topic");
}
private static class MyJoinFunction extends RichMapFunction<Tuple4<String,String,String,String>, Tuple7<String,String,String,String,String,String,String>> {
private Map<String, Tuple4<String, String, String, String>> itemAttributes;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Read the cached file and parse its contents into a map
itemAttributes = new HashMap<>();
try (BufferedReader reader = new BufferedReader(new FileReader(getRuntimeContext().getDistributedCache().getFile("file1")))) {
String line;
while ((line = reader.readLine()) != null) {
String[] parts = line.split(",");
itemAttributes.put(parts[0], new Tuple4<>(parts[0], parts[1], parts[2], parts[3]));
}
}
}
@Override
public Tuple7<String,String,String,String,String,String,String> map(Tuple4<String,String,String,String> value) throws Exception {
Tuple4<String, String, String, String> broadcastValue = itemAttributes.get(value.f1);
if (broadcastValue != null) {
return Tuple7.of(value.f0,value.f1,value.f2,value.f3,broadcastValue.f1,broadcastValue.f2,broadcastValue.f3);
} else {
return null;
}
}
}
}
Empaqueta el archivo jar y envíalo a Apache Flink
Estamos enviando el archivo jar empaquetado a Flink:
Producir un tema user_events
en tiempo real en Kafka
Podemos generar eventos de comportamiento de usuario en tiempo real user_events
en Kafka.
Consumir la unión de itemAttributes
con user_events
en Kafka
Ahora usamos itemAttributes
en los eventos de actividad de usuario con unión al sistema de archivos user_events
.
Seguimos produciendo y consumiendo la actividad de usuario y los atributos del elemento en las siguientes imágenes.
Referencia
- ejemplos de Flink
- Sitio web de Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink y los nombres de proyecto de código abierto asociados son marcas comerciales de la Apache Software Foundation (ASF).