將 Apache Flink® 在 AKS 上的 HDInsight 連接到適用於 Apache Kafka® 的 Azure 事件中樞
重要
AKS 上的 Azure HDInsight 於 2025 年 1 月 31 日淘汰。 透過此公告 深入瞭解。
您必須將工作負載移轉至 Microsoft Fabric 或對等 Azure 產品,以避免突然終止工作負載。
重要
這項功能目前為預覽狀態。 Microsoft Azure 預覽版的補充使用規定 包含適用於 Beta 版、預覽版或尚未正式發行之 Azure 功能的更合法條款。 如需有關此特定預覽的資訊,請參閱 AKS 上的 Azure HDInsight 預覽資訊。 如果有問題或功能建議,請在 AskHDInsight 上提交請求,並追蹤我們以獲取 Azure HDInsight 社群的更多更新。
Apache Flink 的已知使用案例是串流分析。 許多使用者使用使用 Apache Kafka 擷取的數據流的熱門選擇。 Flink 和 Kafka 的一般安裝通常是從將事件串流推送至 Kafka 開始,然後這些串流可以被 Flink 作業消耗。 Azure 事件中樞會在事件中樞上提供 Apache Kafka 端點,讓用戶能夠使用 Kafka 通訊協定連線到事件中樞。
在本文中,我們會探索如何在 AKS 上,使用 HDInsight 上的 Apache Flink 連線 Azure 事件中樞,並涵蓋下列內容:
- 建立事件中樞命名空間
- 使用 Apache Flink 在 AKS 叢集上建立 HDInsight
- 執行 Flink 產生者
- 適用於 Apache Flink 的套件 Jar
- 作業提交 & 驗證
建立 Event Hubs 命名空間和事件中樞
在 AKS 的 HDInsight 上設定 Flink 叢集
在 AKS 叢集集區上使用現有的 HDInsight,您可以建立 Flink 叢集
執行 Flink 生產者程序,並新增 bootstrap.servers 和
producer.config
的相關資訊。bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
將
{YOUR.EVENTHUBS.CONNECTION.STRING}
替換為事件中樞命名空間的連接字串。 如需取得連接字串的指示,請參閱如何 取得事件中樞連接字串的詳細數據,。例如
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
封裝適用於 Flink 的 JAR
套件 com.example.app;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
添加程式碼片段以運行 Flink 生產者。
執行程式代碼之後,事件會儲存在主題 “topic1”
參考
- Apache Flink 網站
- Apache、Apache Kafka、Kafka、Apache Flink、Flink 和相關聯的開放原始碼專案名稱是Apache Software Foundation(ASF)的商標。