Wzbogacanie zdarzeń z platformy Apache Kafka® za pomocą atrybutów usługi ADLS Gen2 za pomocą narzędzia Apache Flink®
Ważny
Usługa Azure HDInsight w usłudze AKS została wycofana 31 stycznia 2025 r. Dowiedz się więcej z tego ogłoszenia.
Aby uniknąć nagłego kończenia obciążeń, należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure.
Ważny
Ta funkcja jest obecnie dostępna w wersji zapoznawczej. Dodatkowe warunki użytkowania platformy Microsoft Azure zawierają więcej warunków prawnych, które dotyczą funkcji platformy Azure w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej Azure HDInsight na AKS, zobacz informacje o wersji zapoznawczej. W przypadku pytań lub sugestii dotyczących funkcji, prosimy o przesłanie zapytania na AskHDInsight z podanymi szczegółami oraz śledzenie nas po więcej aktualności w Społeczności Azure HDInsight.
W tym artykule dowiesz się, jak można wzbogacić zdarzenia w czasie rzeczywistym, łącząc strumień z platformy Kafka z tabelą znajdującą się w usłudze ADLS Gen2 przy użyciu przesyłania strumieniowego Flink. Używamy Flink Streaming API do łączenia zdarzeń z HDInsight Kafka z atrybutami z ADLS Gen2. Ponadto do ujścia do innego tematu platformy Kafka używamy zdarzeń przyłączonych do atrybutów.
Warunki wstępne
- klaster Flink w usłudze HDInsight na AKS
-
klaster Kafka w usłudze HDInsight
- Upewnij się, że ustawienia sieci są zadbane zgodnie z opisem w temacie Korzystanie z Kafki na platformie HDInsight, aby upewnić się, że HDInsight na AKS i klastry HDInsight znajdują się w tej samej sieci wirtualnej (VNet).
- Na potrzeby tego pokazu używamy Windows VM jako środowiska do rozwijania projektów Maven w tej samej sieci VNet co HDInsight na AKS.
Przygotowanie tematu platformy Kafka
Tworzymy temat o nazwie user_events
.
- Celem jest odczytywanie strumienia zdarzeń w czasie rzeczywistym z tematu platformy Kafka przy użyciu funkcji Flink. Każde zdarzenie ma następujące pola:
user_id, item_id, type, timestamp,
Kafka 3.2.0
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events --bootstrap-server wn0-contsk:9092
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events_output --bootstrap-server wn0-contsk:9092
Przygotowywanie pliku w usłudze ADLS Gen2
Tworzymy plik o nazwie item attributes
w magazynie
- Celem jest odczytanie partii
item attributes
z pliku w usłudze ADLS Gen2. Każdy element ma następujące pola:item_id, brand, category, timestamp,
Rozwiń zadanie Apache Flink
W tym kroku wykonamy następujące działania
- Wzbogać temat
user_events
z platformy Kafka poprzez połączenie zitem attributes
z pliku w usłudze ADLS Gen2. - Wypychamy wynik tego kroku jako wzbogaconą aktywność użytkownika dotyczącą zdarzeń do tematu w systemie Kafka.
Opracowywanie projektu Maven
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>FlinkKafkaJoinGen2</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<kafka.version>3.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Dołącz do tematu Kafka z plikiem ADLS Gen2
KafkaJoinGen2Demo.java
package contoso.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;
public class KafkaJoinGen2Demo {
public static void main(String[] args) throws Exception {
// 1. Set up the stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka source configuration, update with your broker IPs
String brokers = "<broker-ip>:9092,<broker-ip>:9092,<broker-ip>:9092";
String inputTopic = "user_events";
String outputTopic = "user_events_output";
String groupId = "my_group";
// 2. Register the cached file, update your container name and storage name
env.registerCachedFile("abfs://<container-name>@<storagename>.dfs.core.windows.net/flink/data/item.txt", "file1");
// 3. Read a stream of real-time user behavior event from a Kafka topic
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics(inputTopic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafkaData = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Parse Kafka source data
DataStream<Tuple4<String, String, String, String>> userEvents = kafkaData.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
@Override
public Tuple4<String, String, String, String> map(String value) throws Exception {
// Parse the line into a Tuple4
String[] parts = value.split(",");
if (parts.length < 4) {
// Log and skip malformed record
System.out.println("Malformed record: " + value);
return null;
}
return new Tuple4<>(parts[0], parts[1], parts[2], parts[3]);
}
});
// 4. Enrich the user activity events by joining the items' attributes from a file
DataStream<Tuple7<String,String,String,String,String,String,String>> enrichedData = userEvents.map(new MyJoinFunction());
// 5. Output the enriched user activity events to a Kafka topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(outputTopic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build();
enrichedData.map(value -> value.toString()).sinkTo(sink);
// 6. Execute the Flink job
env.execute("Kafka Join Batch gen2 file, sink to another Kafka Topic");
}
private static class MyJoinFunction extends RichMapFunction<Tuple4<String,String,String,String>, Tuple7<String,String,String,String,String,String,String>> {
private Map<String, Tuple4<String, String, String, String>> itemAttributes;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Read the cached file and parse its contents into a map
itemAttributes = new HashMap<>();
try (BufferedReader reader = new BufferedReader(new FileReader(getRuntimeContext().getDistributedCache().getFile("file1")))) {
String line;
while ((line = reader.readLine()) != null) {
String[] parts = line.split(",");
itemAttributes.put(parts[0], new Tuple4<>(parts[0], parts[1], parts[2], parts[3]));
}
}
}
@Override
public Tuple7<String,String,String,String,String,String,String> map(Tuple4<String,String,String,String> value) throws Exception {
Tuple4<String, String, String, String> broadcastValue = itemAttributes.get(value.f1);
if (broadcastValue != null) {
return Tuple7.of(value.f0,value.f1,value.f2,value.f3,broadcastValue.f1,broadcastValue.f2,broadcastValue.f3);
} else {
return null;
}
}
}
}
Spakuj jar i prześlij do systemu Apache Flink
Przesyłamy spakowany plik jar do Flink:
Tworzenie tematu user_events
w czasie rzeczywistym na platformie Kafka
Możemy utworzyć zdarzenie zachowania użytkownika w czasie rzeczywistym user_events
na platformie Kafka.
Konsumowanie itemAttributes
połączonych z user_events
w Kafce
Teraz używamy itemAttributes
do zdarzeń aktywności użytkownika związanych z łączeniem systemu plików user_events
.
Kontynuujemy tworzenie i używanie atrybutów działań użytkownika i elementów na poniższych obrazach
Odniesienie
- Przykłady Flinka
- Apache Flink strona internetowa
- Nazwy projektów typu open source Apache, Apache Kafka, Kafka, Apache Flink, Flink oraz innych powiązanych projektów open source są znakami towarowymiApache Software Foundation (ASF).