Schreiben von Ereignisnachrichten in Azure Data Lake Storage Gen2 mit Apache Flink® DataStream-API
Wichtig
Azure HDInsight auf AKS wurde am 31. Januar 2025 eingestellt. Erfahren Sie mehr mit dieser Ankündigung.
Sie müssen Ihre Workloads zu Microsoft Fabric oder ein gleichwertiges Azure-Produkt migrieren, um eine abrupte Beendigung Ihrer Workloads zu vermeiden.
Wichtig
Dieses Feature befindet sich derzeit in der Vorschau. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure Previews weitere rechtliche Bestimmungen enthalten, die für Azure-Features gelten, die in der Betaversion, in der Vorschau oder auf andere Weise noch nicht in die allgemeine Verfügbarkeit veröffentlicht werden. Informationen zu dieser spezifischen Vorschau finden Sie unter Azure HDInsight in AKS-Vorschauinformationen. Für Fragen oder Funktionsvorschläge reichen Sie bitte einen Antrag auf AskHDInsight ein, mit den Details, und folgen Sie uns für weitere Updates zur Azure HDInsight Community.
Apache Flink verwendet Dateisysteme, um Daten sowohl für die Ergebnisse von Anwendungen als auch für die Fehlertoleranz und Wiederherstellung zu nutzen und dauerhaft zu speichern. In diesem Artikel erfahren Sie, wie Sie Ereignisnachrichten mit der DataStream-API in Azure Data Lake Storage Gen2 schreiben.
Voraussetzungen
- Apache Flink-Cluster auf HDInsight auf AKS
-
Apache Kafka Cluster auf HDInsight
- Sie müssen dafür sorgen, dass die Netzwerkeinstellungen wie in Using Apache Kafka on HDInsightbeschrieben gehandhabt werden. Stellen Sie sicher, dass HDInsight auf AKS und HDInsight-Clustern sich im selben virtuellen Netzwerk befinden.
- Verwenden von MSI für den Zugriff auf ADLS Gen2
- IntelliJ für die Entwicklung auf einem virtuellen Azure-Computer in HDInsight im virtuellen AKS-Netzwerk
Apache Flink FileSystem Connector
Dieser Dateisystemconnector bietet die gleichen Garantien für BATCH und STREAMING und ist so konzipiert, dass genau einmal Semantik für die STREAMING-Ausführung bereitgestellt wird. Weitere Informationen finden Sie unter Flink DataStream Filesystem.
Apache Kafka Connector
Flink bietet einen Apache Kafka-Connector zum Lesen von Daten aus und Schreiben von Daten in Kafka-Topics mit genaue Einmal-Garantien. Weitere Informationen finden Sie unter Apache Kafka Connector.
Erstellen des Projekts für Apache Flink
pom.xml auf IntelliJ IDEA
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<kafka.version>3.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Programm für ADLS Gen2 Sink
abfsGen2.java
Anmerkung
Ersetzen Sie Apache Kafka auf dem HDInsight-Cluster bootStrapServers durch Ihre eigenen Broker für Kafka 3.2
package contoso.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
public class KafkaSinkToGen2 {
public static void main(String[] args) throws Exception {
// 1. get stream execution env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration flinkConfig = new Configuration();
flinkConfig.setString("classloader.resolve-order", "parent-first");
env.getConfig().setGlobalJobParameters(flinkConfig);
// 2. read kafka message as stream input, update your broker ip's
String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("click_events")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.print();
// 3. sink to gen2, update container name and storage path
String outputPath = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
.build();
stream.sinkTo(sink);
// 4. run stream
env.execute("Kafka Sink To Gen2");
}
}
Paketieren Sie die Jar-Datei und übermitteln Sie sie an Apache Flink.
Laden Sie das Jar in ABFS hoch.
Übergeben Sie die Job-Jar-Informationen bei der
AppMode
-Clustererstellung.Anmerkung
Stellen Sie sicher, dass Sie "classloader.resolve-order" auf "parent-first" und "hadoop.classpath.enable" auf
true
setzen.Wählen Sie "Auftragsprotokollaggregation" aus, um Auftragsprotokolle in ein Speicherkonto zu übertragen.
Sie können beobachten, wie der Auftrag ausgeführt wird.
Validieren von Streamingdaten auf ADLS Gen2
Wir sehen, wie das click_events
in ADLS Gen2 streamt.
Sie können eine Strategie für das Rollieren festlegen, die die Teildatei in Bearbeitung unter einer der folgenden drei Bedingungen rollt:
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(5))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
Referenz
- Apache Kafka Connector
- Flink DataStream Filesystem
- Apache Flink Website
- Apache, Apache Kafka, Kafka, Apache Flink, Flink und zugehörige Open Source-Projektnamen sind Marken der Apache Software Foundation (ASF).