使用 Apache Flink® DataStream API 將事件訊息寫入 Azure Data Lake Storage Gen2
重要
AKS 上的 Azure HDInsight 於 2025 年 1 月 31 日淘汰。 透過此公告 深入瞭解。
您必須將工作負載移轉至 Microsoft Fabric 或對等 Azure 產品,以避免突然終止工作負載。
重要
這項功能目前為預覽狀態。 Microsoft Azure 預覽版的補充使用規定 包含適用於 Beta 版、預覽版或尚未正式發行之 Azure 功能的更合法條款。 如需此特定預覽的相關信息,請參閱 AKS 預覽資訊上的 Azure HDInsight。 如需問題或功能建議,請提交要求 AskHDInsight,然後追蹤我們以獲取 Azure HDInsight 社群 的最新資訊。
Apache Flink 會使用檔案系統來取用並持續儲存數據,無論是應用程式的結果,還是容錯和復原。 在本文中,瞭解如何使用 DataStream API 將事件訊息寫入 Azure Data Lake Storage Gen2。
先決條件
- AKS 上 HDInsight 上的 Apache Flink 叢集
- 在 HDInsight 上 Apache Kafka 叢集
- 您必須確保按照 在 HDInsight 上使用 Apache Kafka 中所述來妥善處理網路設定。 請確定 AKS 和 HDInsight 叢集上的 HDInsight 位於相同的虛擬網路中。
- 使用 MSI 存取 ADLS Gen2
- IntelliJ 用於在 AKS 虛擬網路上 HDInsight 中的 Azure VM 上進行開發
Apache Flink FileSystem 連接器
此文件系統連接器為 BATCH 和 STREAMING 提供相同的保證,其設計目的是為串流執行提供一次語意。 如需詳細資訊,請參閱 Flink DataStream 檔案系統。
Apache Kafka 連接器
Flink 提供 Apache Kafka 連接器,可讓您在具備恰好一次保證的情況下,從 Kafka 主題讀取資料,並將資料寫入 Kafka 主題。 如需詳細資訊,請參閱 Apache Kafka Connector。
建置 Apache Flink 的專案
IntelliJ IDEA 上的pom.xml
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<kafka.version>3.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
ADLS Gen2 接收程式
abfsGen2.java
注意
將 HDInsight 叢集上的 Apache Kafka 的 bootStrapServers 替換為您自己的 Kafka 3.2 代理伺服器
package contoso.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
public class KafkaSinkToGen2 {
public static void main(String[] args) throws Exception {
// 1. get stream execution env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration flinkConfig = new Configuration();
flinkConfig.setString("classloader.resolve-order", "parent-first");
env.getConfig().setGlobalJobParameters(flinkConfig);
// 2. read kafka message as stream input, update your broker ip's
String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("click_events")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.print();
// 3. sink to gen2, update container name and storage path
String outputPath = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
.build();
stream.sinkTo(sink);
// 4. run stream
env.execute("Kafka Sink To Gen2");
}
}
將 JAR 包裝,並提交至 Apache Flink。
將 jar 上傳至 ABFS。
在
AppMode
叢集建立中傳遞任務 jar 資訊。注意
請務必將 classloader.resolve-order 新增為 'parent-first',並將 hadoop.classpath.enable 新增為
true
選取 [作業記錄匯總] 以將作業記錄推送至記憶體帳戶。
您可以看到作業正在執行。
驗證 ADLS Gen2 上的串流數據
我們看到 click_events
正流入 ADLS Gen2。
您可以指定一個滾動策略,以在下列三個條件下滾動進行中的部份檔案:
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(5))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
參考
- Apache Kafka Connector
- Flink DataStream 檔案系統
- Apache Flink 網站
- Apache、Apache Kafka、Kafka、Apache Flink、Flink 和相關聯的開放原始碼專案名稱 Apache Software Foundation (ASF) 商標。