Vytvoření tabulky Apache Kafka® v Apache Flinku® ve službě HDInsight v AKS
Důležitý
Azure HDInsight v AKS byl vyřazen 31. ledna 2025. Další informace s tímto oznámením.
Abyste se vyhnuli náhlému ukončení úloh, musíte migrovat úlohy do Microsoft Fabric nebo ekvivalentního produktu Azure.
Důležitý
Tato funkce je aktuálně ve verzi Preview. Doplňkové podmínky použití pro verze Preview Microsoft Azure obsahují další právní podmínky, které se vztahují na funkce Azure, jež jsou v beta verzi, ve verzi Preview nebo ještě nebyly vydány jako všeobecně dostupné. Informace o této konkrétní verzi Preview najdete v tématu Azure HDInsight na AKS. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost o AskHDInsight s podrobnostmi a sledujte nás o dalších aktualizacích komunity Azure HDInsight.
V tomto příkladu se dozvíte, jak vytvořit tabulku Kafka v Apache FlinkSQL.
Požadavky
Konektor Kafka SQL pro Apache Flink
Konektor Kafka umožňuje čtení dat z témat Kafka a zápis dat do nich. Další informace naleznete v konektoru SQL Apache Kafka .
Vytvoření tabulky Kafka v Flink SQL
Příprava tématu a dat ve službě HDInsight Kafka
Příprava zpráv pomocí weblog.py
import random
import json
import time
from datetime import datetime
user_set = [
'John',
'XiaoMing',
'Mike',
'Tom',
'Machael',
'Zheng Hu',
'Zark',
'Tim',
'Andrew',
'Pick',
'Sean',
'Luke',
'Chunck'
]
web_set = [
'https://google.com',
'https://facebook.com?id=1',
'https://tmall.com',
'https://baidu.com',
'https://taobao.com',
'https://aliyun.com',
'https://apache.com',
'https://flink.apache.com',
'https://hbase.apache.com',
'https://github.com',
'https://gmail.com',
'https://stackoverflow.com',
'https://python.org'
]
def main():
while True:
if random.randrange(10) < 4:
url = random.choice(web_set[:3])
else:
url = random.choice(web_set)
log_entry = {
'userName': random.choice(user_set),
'visitURL': url,
'ts': datetime.now().strftime("%m/%d/%Y %H:%M:%S")
}
print(json.dumps(log_entry))
time.sleep(0.05)
if __name__ == "__main__":
main()
Pipeline do Kafka tématu
sshuser@hn0-contsk:~$ python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events
Další příkazy:
-- create topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic click_events --bootstrap-server wn0-contsk:9092
-- delete topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic click_events --bootstrap-server wn0-contsk:9092
-- consume topic
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic click_events --from-beginning
{"userName": "Luke", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Tom", "visitURL": "https://stackoverflow.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Chunck", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Chunck", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Andrew", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Pick", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Mike", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Zheng Hu", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Luke", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:44"}
SQL klient Apache Flink
Podrobné pokyny jsou uvedeny o tom, jak používat Secure Shell pro klienta Flink SQL.
Stažení konektoru Kafka SQL & závislostí do SSH
V následujícím kroku používáme Kafka 3.2.0 závislostí, musíte příkaz aktualizovat na základě vaší verze Kafka v clusteru HDInsight.
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Připojení k klientovi Apache Flink SQL
Teď se připojíme k klientovi Flink SQL pomocí souborů JAR klienta Kafka SQL.
msdata@pod-0 [ /opt/flink-webssh ]$ bin/sql-client.sh -j flink-connector-kafka-1.17.0.jar -j kafka-clients-3.2.0.jar
Vytvoření tabulky Kafka v Apache Flink SQL
Vytvoříme tabulku Kafka v Flink SQL a vybereme tabulku Kafka v Flink SQL.
V následujícím fragmentu kódu musíte aktualizovat IP adresy serveru Kafka bootstrap.
CREATE TABLE KafkaTable (
`userName` STRING,
`visitURL` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'click_events',
'properties.bootstrap.servers' = '<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092',
'properties.group.id' = 'my_group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
select * from KafkaTable;
Vytváření zpráv Kafka
Teď vytvoříme zprávy Kafka do stejného tématu pomocí HDInsight Kafka.
python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events
Tabulka v Apache Flink SQL
Tabulku můžete monitorovat v Flink SQL.
Tady jsou úlohy streamování ve webovém uživatelském rozhraní Flink.
Odkaz
- SQL konektor pro Apache Kafka
- Názvy projektů Apache, Apache Kafka, Apache Flink, Flink a přidružených open source projektů jsou ochranné známkyApache Software Foundation (ASF).