Wzbogacanie zdarzeń z platformy Apache Kafka® za pomocą atrybutów usługi ADLS Gen2 za pomocą narzędzia Apache Flink®
Uwaga
Wycofamy usługę Azure HDInsight w usłudze AKS 31 stycznia 2025 r. Przed 31 stycznia 2025 r. należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure, aby uniknąć nagłego zakończenia obciążeń. Pozostałe klastry w ramach subskrypcji zostaną zatrzymane i usunięte z hosta.
Tylko podstawowa pomoc techniczna będzie dostępna do daty wycofania.
Ważne
Ta funkcja jest aktualnie dostępna jako funkcja podglądu. Dodatkowe warunki użytkowania dla wersji zapoznawczych platformy Microsoft Azure obejmują więcej warunków prawnych, które dotyczą funkcji platformy Azure, które znajdują się w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej, zobacz Informacje o wersji zapoznawczej usługi Azure HDInsight w usłudze AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie w usłudze AskHDInsight , aby uzyskać szczegółowe informacje i postępuj zgodnie z nami, aby uzyskać więcej aktualizacji w społeczności usługi Azure HDInsight.
W tym artykule dowiesz się, jak wzbogacić zdarzenia w czasie rzeczywistym, dołączając strumień z platformy Kafka za pomocą tabeli w usłudze ADLS Gen2 przy użyciu przesyłania strumieniowego Flink. Interfejs API przesyłania strumieniowego Flink jest używany do dołączania zdarzeń z usługi HDInsight Kafka z atrybutami z usługi ADLS Gen2. Ponadto do ujścia do innego tematu platformy Kafka używamy zdarzeń przyłączonych do atrybutów.
Wymagania wstępne
- Klaster Flink w usłudze HDInsight w usłudze AKS
- Klaster Kafka w usłudze HDInsight
- Upewnij się, że ustawienia sieci są zadbane zgodnie z opisem w temacie Using Kafka on HDInsight on HDInsight on AKS and HDInsight clusters are in the same VNet (Korzystanie z platformy Kafka w usłudze HDInsight w usłudze HDInsight w usłudze AKS i klastrach usługi HDInsight )
- Na potrzeby tego pokazu używamy maszyny wirtualnej z systemem Windows jako środowiska programistycznego projektu maven w tej samej sieci wirtualnej co usługa HDInsight w usłudze AKS
Przygotowanie tematu platformy Kafka
Tworzymy temat o nazwie user_events
.
- Celem jest odczytywanie strumienia zdarzeń w czasie rzeczywistym z tematu platformy Kafka przy użyciu funkcji Flink. Każde zdarzenie ma następujące pola:
user_id, item_id, type, timestamp,
Kafka 3.2.0
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events --bootstrap-server wn0-contsk:9092
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_events_output --bootstrap-server wn0-contsk:9092
Przygotowywanie pliku w usłudze ADLS Gen2
Tworzymy plik o nazwie item attributes
w magazynie
- Celem jest odczytanie partii
item attributes
z pliku w usłudze ADLS Gen2. Każdy element ma następujące pola:item_id, brand, category, timestamp,
Opracowywanie zadania platformy Apache Flink
W tym kroku wykonamy następujące działania
- Wzbogacanie tematu
user_events
z platformy Kafka przez dołączenie zitem attributes
pliku w usłudze ADLS Gen2. - Wypychamy wynik tego kroku jako wzbogaconą aktywność użytkownika zdarzeń do tematu platformy Kafka.
Opracowywanie projektu Maven
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>FlinkKafkaJoinGen2</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<kafka.version>3.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Dołącz do tematu platformy Kafka przy użyciu pliku usługi ADLS Gen2
KafkaJoinGen2Demo.java
package contoso.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;
public class KafkaJoinGen2Demo {
public static void main(String[] args) throws Exception {
// 1. Set up the stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka source configuration, update with your broker IPs
String brokers = "<broker-ip>:9092,<broker-ip>:9092,<broker-ip>:9092";
String inputTopic = "user_events";
String outputTopic = "user_events_output";
String groupId = "my_group";
// 2. Register the cached file, update your container name and storage name
env.registerCachedFile("abfs://<container-name>@<storagename>.dfs.core.windows.net/flink/data/item.txt", "file1");
// 3. Read a stream of real-time user behavior event from a Kafka topic
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics(inputTopic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafkaData = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Parse Kafka source data
DataStream<Tuple4<String, String, String, String>> userEvents = kafkaData.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
@Override
public Tuple4<String, String, String, String> map(String value) throws Exception {
// Parse the line into a Tuple4
String[] parts = value.split(",");
if (parts.length < 4) {
// Log and skip malformed record
System.out.println("Malformed record: " + value);
return null;
}
return new Tuple4<>(parts[0], parts[1], parts[2], parts[3]);
}
});
// 4. Enrich the user activity events by joining the items' attributes from a file
DataStream<Tuple7<String,String,String,String,String,String,String>> enrichedData = userEvents.map(new MyJoinFunction());
// 5. Output the enriched user activity events to a Kafka topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(outputTopic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build();
enrichedData.map(value -> value.toString()).sinkTo(sink);
// 6. Execute the Flink job
env.execute("Kafka Join Batch gen2 file, sink to another Kafka Topic");
}
private static class MyJoinFunction extends RichMapFunction<Tuple4<String,String,String,String>, Tuple7<String,String,String,String,String,String,String>> {
private Map<String, Tuple4<String, String, String, String>> itemAttributes;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Read the cached file and parse its contents into a map
itemAttributes = new HashMap<>();
try (BufferedReader reader = new BufferedReader(new FileReader(getRuntimeContext().getDistributedCache().getFile("file1")))) {
String line;
while ((line = reader.readLine()) != null) {
String[] parts = line.split(",");
itemAttributes.put(parts[0], new Tuple4<>(parts[0], parts[1], parts[2], parts[3]));
}
}
}
@Override
public Tuple7<String,String,String,String,String,String,String> map(Tuple4<String,String,String,String> value) throws Exception {
Tuple4<String, String, String, String> broadcastValue = itemAttributes.get(value.f1);
if (broadcastValue != null) {
return Tuple7.of(value.f0,value.f1,value.f2,value.f3,broadcastValue.f1,broadcastValue.f2,broadcastValue.f3);
} else {
return null;
}
}
}
}
Plik jar pakietu i prześlij do narzędzia Apache Flink
Przesyłamy spakowany plik jar do Flink:
Tworzenie tematu w czasie user_events
rzeczywistym na platformie Kafka
Na platformie Kafka możemy utworzyć zdarzenie user_events
zachowania użytkownika w czasie rzeczywistym.
Korzystanie z user_events
dołączania do platformy itemAttributes
Kafka
Teraz używamy itemAttributes
zdarzeń user_events
aktywności użytkownika dołączania systemu plików.
Kontynuujemy tworzenie i używanie atrybutów działań użytkownika i elementów na poniższych obrazach
Odwołanie
- Przykłady
- Witryna internetowa platformy Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink i skojarzone nazwy projektów typu open source są znakami towarowymi platformy Apache Software Foundation (ASF).