AKS 上の HDInsight 上の Apache Flink® を Azure Event Hubs for Apache Kafka® に接続する
大事な
AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 この発表 を元に、さらに詳しくをご覧ください。
ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。
大事な
この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能の提案については、詳細を記載して AskHDInsight にリクエストを送信し、Azure HDInsight Community をフォローして最新情報をぜひご確認ください。
Apache Flink のよく知られているユース ケースは、ストリーム分析です。 多くのユーザーが Apache Kafka を使用して取り込まれるデータストリームを使用することを一般的に選んでいます。 Flink と Kafka の一般的なインストールは、Flink ジョブで使用できるイベント ストリームが Kafka にプッシュされることから始まります。 Azure Event Hubs は、イベント ハブ上に Apache Kafka エンドポイントを提供します。これにより、ユーザーは Kafka プロトコルを使用してイベント ハブに接続できます。
この記事では、AKS 上の HDInsight における Apache Flink と Azure Event Hubs を接続する方法について検討し、以下の内容をカバーします。
- Event Hubs 名前空間を作成する
- Apache Flink を使用して AKS クラスターに HDInsight を作成する
- Flink プロデューサーを実行する
- Apache Flink 用パッケージ Jar
- ジョブ提出 & の検証
Event Hubs 名前空間と Event Hubs を作成する
Event Hubs 名前空間と Event Hubs を作成するには、の を参照してください。
AKS 上の HDInsight で Flink クラスターを設定する
AKS クラスター プールで既存の HDInsight を使用すると、Flink クラスター を作成できます
bootstrap.servers と
producer.config
情報を追加して、Flink プロデューサーを実行しますbootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
{YOUR.EVENTHUBS.CONNECTION.STRING}
を Event Hubs 名前空間の接続文字列に置き換えます。 接続文字列を取得する手順については、Event Hubs 接続文字列 取得する方法の詳細を参照してください。例えば
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Flink 用 JAR のパッケージ化
パッケージ com.example.app;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Flink プロデューサーを実行するスニペットを追加します。
コードが実行されると、イベントはトピック "topic1" に格納されます。
参考
- Apache Flink ウェブサイト
- Apache、Apache Kafka、Kafka、Apache Flink、Flink、および関連するオープンソースプロジェクト名は、Apache Software Foundation (ASF) の商標です。