使用 Apache Flink® DataStream API 将事件消息写入 Azure Data Lake Storage Gen2
重要
AKS 上的 Azure HDInsight 已于 2025 年 1 月 31 日停用。 了解此公告的详细信息。
需要将工作负荷迁移到 Microsoft Fabric 或等效的 Azure 产品,以避免工作负荷突然终止。
重要
此功能目前以预览版提供。 Microsoft Azure 预览版补充使用条款 包括适用于 beta 版、预览版或尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览的信息,请参阅 AKS 预览信息的 Azure HDInsight。 有关问题或功能建议,请在 AskHDInsight 上提交请求,并提供详细信息。关注我们以获取 Azure HDInsight 社区 的更多更新。
Apache Flink 使用文件系统来使用和持久存储数据,无论是应用程序的结果还是容错和恢复。 本文介绍如何使用 DataStream API 将事件消息写入 Azure Data Lake Storage Gen2。
先决条件
- AKS 上的 HDInsight 上的 Apache Flink 群集
-
HDInsight 上的 Apache Kafka 群集
- 您需要确保按照 使用 Apache Kafka on HDInsight中的描述正确设置网络。 请确保 AKS 和 HDInsight 群集上的 HDInsight 位于同一虚拟网络中。
- 使用 MSI 访问 ADLS Gen2
- IntelliJ 用于在 AKS 虚拟网络的 HDInsight 上的 Azure VM 进行开发
Apache Flink FileSystem 连接器
此文件系统连接器为 BATCH 和 STREAMING 提供相同的保证,旨在为流处理执行提供精确一次语义。 有关详细信息,请参阅 Flink DataStream 文件系统。
Apache Kafka 连接器
Flink 提供了一个 Apache Kafka 连接器,用于从 Kafka 主题读取数据和将数据写入 Kafka 主题,并保证精确一次性处理。 有关详细信息,请参阅 Apache Kafka 连接器。
为 Apache Flink 生成项目
和pom.xml 在 IntelliJ IDEA 上
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<kafka.version>3.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
ADLS Gen2 接收器 的 计划
abfsGen2.java
注意
将 Apache Kafka on HDInsight 群集的 bootStrapServers 替换为您自己的 Kafka 3.2 代理服务器。
package contoso.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
public class KafkaSinkToGen2 {
public static void main(String[] args) throws Exception {
// 1. get stream execution env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration flinkConfig = new Configuration();
flinkConfig.setString("classloader.resolve-order", "parent-first");
env.getConfig().setGlobalJobParameters(flinkConfig);
// 2. read kafka message as stream input, update your broker ip's
String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("click_events")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.print();
// 3. sink to gen2, update container name and storage path
String outputPath = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
.build();
stream.sinkTo(sink);
// 4. run stream
env.execute("Kafka Sink To Gen2");
}
}
包 jar,并提交到 Apache Flink。
将 jar 上传到 ABFS。
在创建群集
AppMode
时,传递作业 jar 信息。注意
请确保将 classloader.resolve-order 添加为“parent-first”,并将 hadoop.classpath.enable 添加为
true
选择作业日志聚合,将作业日志推送到存储帐户。
可以看到任务正在运行。
验证 ADLS Gen2 上的流式数据
我们正在目睹 click_events
流入 ADLS Gen2。
您可以通过以下三个条件中的任何一个来指定正在写入文件的滚动策略:
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(5))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
参考
- Apache Kafka 连接器
- Flink DataStream 文件系统
- Apache Flink 网站
- Apache、Apache Kafka、Kafka、Apache Flink、Flink 和关联的开源项目名称是 Apache Software Foundation(ASF)商标。