次の方法で共有


AKS 上の HDInsight 上の Apache Flink® を Azure Event Hubs for Apache Kafka® に接続する

大事な

AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 この発表 を元に、さらに詳しくをご覧ください。

ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。

大事な

この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能の提案については、詳細を記載して AskHDInsight にリクエストを送信し、Azure HDInsight Community をフォローして最新情報をぜひご確認ください。

Apache Flink のよく知られているユース ケースは、ストリーム分析です。 多くのユーザーが Apache Kafka を使用して取り込まれるデータストリームを使用することを一般的に選んでいます。 Flink と Kafka の一般的なインストールは、Flink ジョブで使用できるイベント ストリームが Kafka にプッシュされることから始まります。 Azure Event Hubs は、イベント ハブ上に Apache Kafka エンドポイントを提供します。これにより、ユーザーは Kafka プロトコルを使用してイベント ハブに接続できます。

この記事では、AKS 上の HDInsight における Apache Flink と Azure Event Hubs を接続する方法について検討し、以下の内容をカバーします。

  • Event Hubs 名前空間を作成する
  • Apache Flink を使用して AKS クラスターに HDInsight を作成する
  • Flink プロデューサーを実行する
  • Apache Flink 用パッケージ Jar
  • ジョブ提出 & の検証

Event Hubs 名前空間と Event Hubs を作成する

  1. Event Hubs 名前空間と Event Hubs を作成するには、 を参照してください。

    Event Hubs のセットアップを示すスクリーンショット。

  1. AKS クラスター プールで既存の HDInsight を使用すると、Flink クラスター を作成できます

  2. bootstrap.serversproducer.config 情報を追加して、Flink プロデューサーを実行します

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. {YOUR.EVENTHUBS.CONNECTION.STRING} を Event Hubs 名前空間の接続文字列に置き換えます。 接続文字列を取得する手順については、Event Hubs 接続文字列 取得する方法の詳細を参照してください。

    例えば

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. パッケージ com.example.app;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Flink プロデューサーを実行するスニペットを追加します。

    Event Hubs で Flink をテストする方法を示すスクリーンショット。

  3. コードが実行されると、イベントはトピック "topic1" に格納されます。

    トピックに格納されている Event Hubs を示すスクリーンショット。

参考