Teilen über


Schreiben von Ereignisnachrichten in Azure Data Lake Storage Gen2 mit Apache Flink® DataStream-API

Wichtig

Azure HDInsight auf AKS wurde am 31. Januar 2025 eingestellt. Erfahren Sie mehr mit dieser Ankündigung.

Sie müssen Ihre Workloads zu Microsoft Fabric oder ein gleichwertiges Azure-Produkt migrieren, um eine abrupte Beendigung Ihrer Workloads zu vermeiden.

Wichtig

Dieses Feature befindet sich derzeit in der Vorschau. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure Previews weitere rechtliche Bestimmungen enthalten, die für Azure-Features gelten, die in der Betaversion, in der Vorschau oder auf andere Weise noch nicht in die allgemeine Verfügbarkeit veröffentlicht werden. Informationen zu dieser spezifischen Vorschau finden Sie unter Azure HDInsight in AKS-Vorschauinformationen. Für Fragen oder Funktionsvorschläge reichen Sie bitte einen Antrag auf AskHDInsight ein, mit den Details, und folgen Sie uns für weitere Updates zur Azure HDInsight Community.

Apache Flink verwendet Dateisysteme, um Daten sowohl für die Ergebnisse von Anwendungen als auch für die Fehlertoleranz und Wiederherstellung zu nutzen und dauerhaft zu speichern. In diesem Artikel erfahren Sie, wie Sie Ereignisnachrichten mit der DataStream-API in Azure Data Lake Storage Gen2 schreiben.

Voraussetzungen

Dieser Dateisystemconnector bietet die gleichen Garantien für BATCH und STREAMING und ist so konzipiert, dass genau einmal Semantik für die STREAMING-Ausführung bereitgestellt wird. Weitere Informationen finden Sie unter Flink DataStream Filesystem.

Apache Kafka Connector

Flink bietet einen Apache Kafka-Connector zum Lesen von Daten aus und Schreiben von Daten in Kafka-Topics mit genaue Einmal-Garantien. Weitere Informationen finden Sie unter Apache Kafka Connector.

pom.xml auf IntelliJ IDEA

<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>3.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Programm für ADLS Gen2 Sink

abfsGen2.java

Anmerkung

Ersetzen Sie Apache Kafka auf dem HDInsight-Cluster bootStrapServers durch Ihre eigenen Broker für Kafka 3.2

package contoso.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class KafkaSinkToGen2 {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         
        Configuration flinkConfig = new Configuration(); 

         flinkConfig.setString("classloader.resolve-order", "parent-first"); 

         env.getConfig().setGlobalJobParameters(flinkConfig);  

        // 2. read kafka message as stream input, update your broker ip's
        String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("click_events")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        stream.print();

        // 3. sink to gen2, update container name and storage path
        String outputPath  = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(2))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())
                .build();

        stream.sinkTo(sink);

        // 4. run stream
        env.execute("Kafka Sink To Gen2");
    }
}

Paketieren Sie die Jar-Datei und übermitteln Sie sie an Apache Flink.

  1. Laden Sie das Jar in ABFS hoch.

    Screenshot zeigt den Flink-App-Modus-Bildschirm.

  2. Übergeben Sie die Job-Jar-Informationen bei der AppMode-Clustererstellung.

    Screenshot mit dem App-Modus erstellen.

    Anmerkung

    Stellen Sie sicher, dass Sie "classloader.resolve-order" auf "parent-first" und "hadoop.classpath.enable" auf true setzen.

  3. Wählen Sie "Auftragsprotokollaggregation" aus, um Auftragsprotokolle in ein Speicherkonto zu übertragen.

    Screenshot, der zeigt, wie man das Auftragsprotokoll aktiviert.

  4. Sie können beobachten, wie der Auftrag ausgeführt wird.

    Screenshot, der die Flink-Benutzeroberfläche zeigt.

Validieren von Streamingdaten auf ADLS Gen2

Wir sehen, wie das click_events in ADLS Gen2 streamt.

Bildschirmfoto mit ADLS Gen2-Ausgabe. Bildschirmfoto mit Ausgabe des Flink-Klick-Ereignisses.

Sie können eine Strategie für das Rollieren festlegen, die die Teildatei in Bearbeitung unter einer der folgenden drei Bedingungen rollt:

.withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(5))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())

Referenz