次の方法で共有


HDInsight on AKS の Apache Flink® と Apache Kafka® 用の Azure Event Hubs を接続する

Note

Azure HDInsight on AKS は 2025 年 1 月 31 日に廃止されます。 2025 年 1 月 31 日より前に、ワークロードを Microsoft Fabric または同等の Azure 製品に移行することで、ワークロードの突然の終了を回避する必要があります。 サブスクリプション上に残っているクラスターは停止され、ホストから削除されることになります。

提供終了日までは基本サポートのみが利用できます。

重要

現在、この機能はプレビュー段階にあります。 ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用されるその他の法律条項については、「Microsoft Azure プレビューの追加の使用条件」に記載されています。 この特定のプレビューについては、「Microsoft HDInsight on AKS のプレビュー情報」を参照してください。 質問や機能の提案については、詳細を記載した要求を AskHDInsight で送信してください。また、その他の更新情報については、Azure HDInsight コミュニティをフォローしてください。

Apache Flink のよく知られているユース ケースは、ストリーム分析です。 Apache Kafka を使用して取り込まれるデータ ストリームを使用する多くのユーザーが一般的に選択しています。 Flink と Kafka の一般的なインストールは、Flink ジョブで使用できるイベント ストリームが Kafka にプッシュされることから始まります。 Azure Event Hubs では、イベント ハブ上に Apache Kafka エンドポイントを提供します。これにより、ユーザーは Kafka プロトコルを使用してイベント ハブに接続できます。

この記事では、Azure Event HubsHDInsight on AKS の Apache Flink を接続する方法について説明し、以下について説明します

  • Event Hubs 名前空間を作成します
  • Apache Flink を使用して HDInsight on AKS クラスターを作成する
  • Flink プロデューサーを実行する
  • Apache Flink 用の Jar をパッケージ化する
  • ジョブの送信と検証

Event Hubs 名前空間と Event Hubs を作成する

  1. Event Hubs 名前空間と Event Hubs を作成するには、こちらを参照してください

    Event Hubs セットアップを示すスクリーンショット。

  1. 既存の HDInsight on AKS クラスター プールを使用して、Flink クラスターを作成できます

  2. bootstrap.serversproducer.config 情報を追加して、Flink プロデューサーを実行します

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. {YOUR.EVENTHUBS.CONNECTION.STRING} を Event Hubs 名前空間への接続文字列に置き換えます。 接続文字列を取得する手順について詳しくは、「Event Hubs の接続文字列の取得」を参照してください。

    たとえば、 にします。

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. com.example.app をパッケージ化します。

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Flink プロデューサーを実行するスニペットを追加します。

    Event Hubs で Flink をテストする方法を示すスクリーンショット。

  3. コードが実行されると、イベントはトピック "topic1" に格納されます

    トピックに格納された Event Hubs を示すスクリーンショット。

リファレンス