在 HDInsight 上使用 Apache Kafka® 與在 AKS 上的 HDInsight 上使用 Apache Flink®
重要
AKS 上的 Azure HDInsight 於 2025 年 1 月 31 日淘汰。 透過此公告 深入瞭解。
您必須將工作負載移轉至 Microsoft Fabric 或對等 Azure 產品,以避免突然終止工作負載。
重要
這項功能目前為預覽狀態。 Microsoft Azure 預覽版的補充使用規定 包含適用於 Beta 版、預覽版或尚未正式發行之 Azure 功能的更合法條款。 如需此特定預覽的相關資訊,請參閱 AKS 上的 Azure HDInsight 預覽資訊。 如有問題或功能建議,請在 AskHDInsight 提交反饋,並關注我們以獲得更多 Azure HDInsight 社群 的更新。
Apache Flink 的已知使用案例是串流分析。 許多使用者的熱門選擇是使用 Apache Kafka 擷取的數據流。 一般的 Flink 和 Kafka 安裝流程會先將事件串流推送至 Kafka,然後由 Flink 作業進行消耗。
此範例在執行 Flink 1.17.0 的 AKS 叢集上使用 HDInsight 來處理取用和產生 Kafka 主題的串流數據。
注意
FlinkKafkaConsumer 已被取代,且將會隨著 Flink 1.17 一起移除,請改用 KafkaSource。 FlinkKafkaProducer 已被取代,並將使用 Flink 1.15 移除,請改用 KafkaSink。
先決條件
Kafka 和 Flink 都必須位於相同的 VNet 中,或兩個叢集之間應該有 vnet 對等互連。
在相同的 VNet中建立 Kafka 叢集。 您可以根據目前的使用量,選擇 HDInsight 上的 Kafka 3.2 或 2.4。
在虛擬網路區段中新增 VNet 詳細數據。
在 AKS 叢集區上建立 HDInsight,使用與 相同的 VNet。
將 Flink 叢集建立到該叢集集區。
Apache Kafka 連接器
Flink 提供 Apache Kafka Connector,具備從 Kafka 主題讀取數據並寫入 Kafka 主題至多一次的交付保證。
Maven 相依性
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
建置 Kafka 匯入
Kafka 接收器提供建置器類別來建立 KafkaSink 的實例。 我們使用相同的技術來建構接收端,並與在 AKS 上執行的 HDInsight 中的 Flink 叢集結合使用。
SinKafkaToKafka.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinKafkaToKafka {
public static void main(String[] args) throws Exception {
// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. read kafka message as stream input, update your broker IPs below
String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("clicks")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. transformation:
// https://www.taobao.com,1000 --->
// Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
// 4. sink click into another kafka events topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setProperty("transaction.timeout.ms","900000")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
result.sinkTo(sink);
// 5. execute the stream
env.execute("kafka Sink to other topic");
}
}
撰寫 Java 程式 Event.java
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user,String url,Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString(){
return "Event{" +
"user: \"" + user + "\"" +
",url: \"" + url + "\"" +
",timestamp: " + new Timestamp(timestamp) +
"}";
}
}
打包 jar 檔案並將作業提交到 Flink
在 Webssh 上,上傳 jar 並提交 jar
在 Flink 儀錶板 UI 上
產生主題 - 按兩下 Kafka
消費主題 - 在 Kafka 上的事件
參考
- Apache Kafka Connector
- Apache、Apache Kafka、Kafka、Apache Flink、Flink 和相關聯的開放原始碼專案名稱 Apache Software Foundation (ASF) 商標。