共用方式為


將 Apache Flink® 在 AKS 上的 HDInsight 連接到適用於 Apache Kafka® 的 Azure 事件中樞

重要

AKS 上的 Azure HDInsight 於 2025 年 1 月 31 日淘汰。 透過此公告 深入瞭解

您必須將工作負載移轉至 Microsoft Fabric 或對等 Azure 產品,以避免突然終止工作負載。

重要

這項功能目前為預覽狀態。 Microsoft Azure 預覽版的補充使用規定 包含適用於 Beta 版、預覽版或尚未正式發行之 Azure 功能的更合法條款。 如需有關此特定預覽的資訊,請參閱 AKS 上的 Azure HDInsight 預覽資訊。 如果有問題或功能建議,請在 AskHDInsight 上提交請求,並追蹤我們以獲取 Azure HDInsight 社群的更多更新。

Apache Flink 的已知使用案例是串流分析。 許多使用者使用使用 Apache Kafka 擷取的數據流的熱門選擇。 Flink 和 Kafka 的一般安裝通常是從將事件串流推送至 Kafka 開始,然後這些串流可以被 Flink 作業消耗。 Azure 事件中樞會在事件中樞上提供 Apache Kafka 端點,讓用戶能夠使用 Kafka 通訊協定連線到事件中樞。

在本文中,我們會探索如何在 AKS 上,使用 HDInsight 上的 Apache Flink 連線 Azure 事件中樞,並涵蓋下列內容:

  • 建立事件中樞命名空間
  • 使用 Apache Flink 在 AKS 叢集上建立 HDInsight
  • 執行 Flink 產生者
  • 適用於 Apache Flink 的套件 Jar
  • 作業提交 & 驗證

建立 Event Hubs 命名空間和事件中樞

  1. 若要建立事件中樞命名空間和事件中樞,請參閱此處

    顯示事件中樞設定的螢幕快照。

  1. 在 AKS 叢集集區上使用現有的 HDInsight,您可以建立 Flink 叢集

  2. 執行 Flink 生產者程序,並新增 bootstrap.serversproducer.config 的相關資訊。

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. {YOUR.EVENTHUBS.CONNECTION.STRING} 替換為事件中樞命名空間的連接字串。 如需取得連接字串的指示,請參閱如何 取得事件中樞連接字串的詳細數據,

    例如

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. 套件 com.example.app;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. 添加程式碼片段以運行 Flink 生產者。

    顯示如何在事件中樞中測試 Flink 的螢幕快照。

  3. 執行程式代碼之後,事件會儲存在主題 “topic1”

    螢幕快照,顯示在 Topic 中儲存的事件中樞。

參考