共用方式為


使用 Apache Flink® DataStream API 將事件訊息寫入 Azure Data Lake Storage Gen2

重要

AKS 上的 Azure HDInsight 於 2025 年 1 月 31 日淘汰。 透過此公告 深入瞭解

您必須將工作負載移轉至 Microsoft Fabric 或對等 Azure 產品,以避免突然終止工作負載。

重要

這項功能目前為預覽狀態。 Microsoft Azure 預覽版的補充使用規定 包含適用於 Beta 版、預覽版或尚未正式發行之 Azure 功能的更合法條款。 如需此特定預覽的相關信息,請參閱 AKS 預覽資訊上的 Azure HDInsight。 如需問題或功能建議,請提交要求 AskHDInsight,然後追蹤我們以獲取 Azure HDInsight 社群 的最新資訊。

Apache Flink 會使用檔案系統來取用並持續儲存數據,無論是應用程式的結果,還是容錯和復原。 在本文中,瞭解如何使用 DataStream API 將事件訊息寫入 Azure Data Lake Storage Gen2。

先決條件

  • AKS 上 HDInsight 上的 Apache Flink 叢集
  • 在 HDInsight Apache Kafka 叢集
  • 使用 MSI 存取 ADLS Gen2
  • IntelliJ 用於在 AKS 虛擬網路上 HDInsight 中的 Azure VM 上進行開發

此文件系統連接器為 BATCH 和 STREAMING 提供相同的保證,其設計目的是為串流執行提供一次語意。 如需詳細資訊,請參閱 Flink DataStream 檔案系統

Apache Kafka 連接器

Flink 提供 Apache Kafka 連接器,可讓您在具備恰好一次保證的情況下,從 Kafka 主題讀取資料,並將資料寫入 Kafka 主題。 如需詳細資訊,請參閱 Apache Kafka Connector

IntelliJ IDEA 上的pom.xml

<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>3.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

ADLS Gen2 接收程式

abfsGen2.java

注意

將 HDInsight 叢集上的 Apache Kafka 的 bootStrapServers 替換為您自己的 Kafka 3.2 代理伺服器

package contoso.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class KafkaSinkToGen2 {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         
        Configuration flinkConfig = new Configuration(); 

         flinkConfig.setString("classloader.resolve-order", "parent-first"); 

         env.getConfig().setGlobalJobParameters(flinkConfig);  

        // 2. read kafka message as stream input, update your broker ip's
        String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("click_events")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        stream.print();

        // 3. sink to gen2, update container name and storage path
        String outputPath  = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(2))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())
                .build();

        stream.sinkTo(sink);

        // 4. run stream
        env.execute("Kafka Sink To Gen2");
    }
}

將 JAR 包裝,並提交至 Apache Flink。

  1. 將 jar 上傳至 ABFS。

    顯示 Flink 應用程式模式畫面的螢幕快照。

  2. AppMode 叢集建立中傳遞任務 jar 資訊。

    顯示建立應用程式模式的螢幕快照。

    注意

    請務必將 classloader.resolve-order 新增為 'parent-first',並將 hadoop.classpath.enable 新增為 true

  3. 選取 [作業記錄匯總] 以將作業記錄推送至記憶體帳戶。

    顯示如何啟用作業記錄的螢幕快照。

  4. 您可以看到作業正在執行。

    顯示 Flink UI 的螢幕快照。

驗證 ADLS Gen2 上的串流數據

我們看到 click_events 正流入 ADLS Gen2。

顯示ADLS Gen2輸出的螢幕快照。 顯示 Flink click 事件輸出的螢幕快照。

您可以指定一個滾動策略,以在下列三個條件下滾動進行中的部份檔案:

.withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(5))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())

參考