AKS 上の HDInsight 上の Apache Flink® に Apache Kafka® テーブルを作成する
大事な
AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 についてさらに詳しく知りたい方は、このお知らせをご覧ください。
ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。
大事な
この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能の提案がある場合は、詳細を記載の上、AskHDInsight にリクエストを送信してください。また、Azure HDInsight Communityをフォローして、更新情報を受け取ってください。
この例を使用して、Apache FlinkSQL で Kafka テーブルを作成する方法について説明します。
前提 条件
- Apache Kafka クラスター を HDInsight で
- AKS 上の HDInsight 上での Apache Flink クラスター
Apache Flink 上の Kafka SQL コネクタ
Kafka コネクタを使用すると、Kafka トピックからデータを読み取り、Kafka トピックにデータを書き込みます。 詳細については、「Apache Kafka SQL Connector」を参照してください。
Flink SQL で Kafka テーブルを作成する
HDInsight Kafka でトピックとデータを準備する
weblog.py を使用してメッセージを準備する
import random
import json
import time
from datetime import datetime
user_set = [
'John',
'XiaoMing',
'Mike',
'Tom',
'Machael',
'Zheng Hu',
'Zark',
'Tim',
'Andrew',
'Pick',
'Sean',
'Luke',
'Chunck'
]
web_set = [
'https://google.com',
'https://facebook.com?id=1',
'https://tmall.com',
'https://baidu.com',
'https://taobao.com',
'https://aliyun.com',
'https://apache.com',
'https://flink.apache.com',
'https://hbase.apache.com',
'https://github.com',
'https://gmail.com',
'https://stackoverflow.com',
'https://python.org'
]
def main():
while True:
if random.randrange(10) < 4:
url = random.choice(web_set[:3])
else:
url = random.choice(web_set)
log_entry = {
'userName': random.choice(user_set),
'visitURL': url,
'ts': datetime.now().strftime("%m/%d/%Y %H:%M:%S")
}
print(json.dumps(log_entry))
time.sleep(0.05)
if __name__ == "__main__":
main()
Kafkaトピックへのパイプライン
sshuser@hn0-contsk:~$ python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events
その他のコマンド:
-- create topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic click_events --bootstrap-server wn0-contsk:9092
-- delete topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic click_events --bootstrap-server wn0-contsk:9092
-- consume topic
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic click_events --from-beginning
{"userName": "Luke", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Tom", "visitURL": "https://stackoverflow.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Chunck", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Chunck", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Andrew", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Pick", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Mike", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Zheng Hu", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Luke", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:44"}
Apache Flink SQL クライアント
Flink SQL クライアント Secure Shell を使用する方法について詳しく説明します。
Kafka SQL Connector & 依存関係を SSH にダウンロードする
以下の手順では、Kafka 3.2.0 依存関係を使用しています。HDInsight クラスターの Kafka バージョンに基づいてコマンドを更新する必要があります。
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Apache Flink SQL クライアントに接続する
次に、Kafka SQL クライアント jar を使用して Flink SQL クライアントに接続してみましょう。
msdata@pod-0 [ /opt/flink-webssh ]$ bin/sql-client.sh -j flink-connector-kafka-1.17.0.jar -j kafka-clients-3.2.0.jar
Apache Flink SQL で Kafka テーブルを作成する
Flink SQL で Kafka テーブルを作成し、Flink SQL の Kafka テーブルを選択してみましょう。
以下のスニペットで Kafka ブートストラップ サーバー IP を更新する必要があります。
CREATE TABLE KafkaTable (
`userName` STRING,
`visitURL` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'click_events',
'properties.bootstrap.servers' = '<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092',
'properties.group.id' = 'my_group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
select * from KafkaTable;
Kafka メッセージを生成する
次に、HDInsight Kafka を使用して、同じトピックに対する Kafka メッセージを生成してみましょう。
python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events
Apache Flink SQL のテーブル
Flink SQL でテーブルを監視できます。
Flink Web UI のストリーミング ジョブを次に示します。
参考
- Apache Kafka SQL Connector
- Apache、Apache Kafka、Kafka、Apache Flink、Flink、および関連するオープン ソース プロジェクト名は、Apache Software Foundation (ASF) の 商標です。