Sdílet prostřednictvím


Vytvoření tabulky Apache Kafka® v Apache Flinku® ve službě HDInsight v AKS

Důležitý

Azure HDInsight v AKS byl vyřazen 31. ledna 2025. Další informace s tímto oznámením.

Abyste se vyhnuli náhlému ukončení úloh, musíte migrovat úlohy do Microsoft Fabric nebo ekvivalentního produktu Azure.

Důležitý

Tato funkce je aktuálně ve verzi Preview. Doplňkové podmínky použití pro verze Preview Microsoft Azure obsahují další právní podmínky, které se vztahují na funkce Azure, jež jsou v beta verzi, ve verzi Preview nebo ještě nebyly vydány jako všeobecně dostupné. Informace o této konkrétní verzi Preview najdete v tématu Azure HDInsight na AKS. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost o AskHDInsight s podrobnostmi a sledujte nás o dalších aktualizacích komunity Azure HDInsight.

V tomto příkladu se dozvíte, jak vytvořit tabulku Kafka v Apache FlinkSQL.

Požadavky

Konektor Kafka umožňuje čtení dat z témat Kafka a zápis dat do nich. Další informace naleznete v konektoru SQL Apache Kafka .

Příprava tématu a dat ve službě HDInsight Kafka

Příprava zpráv pomocí weblog.py

import random
import json
import time
from datetime import datetime

user_set = [
        'John',
        'XiaoMing',
        'Mike',
        'Tom',
        'Machael',
        'Zheng Hu',
        'Zark',
        'Tim',
        'Andrew',
        'Pick',
        'Sean',
        'Luke',
        'Chunck'
]

web_set = [
        'https://google.com',
        'https://facebook.com?id=1',
        'https://tmall.com',
        'https://baidu.com',
        'https://taobao.com',
        'https://aliyun.com',
        'https://apache.com',
        'https://flink.apache.com',
        'https://hbase.apache.com',
        'https://github.com',
        'https://gmail.com',
        'https://stackoverflow.com',
        'https://python.org'
]

def main():
        while True:
                if random.randrange(10) < 4:
                        url = random.choice(web_set[:3])
                else:
                        url = random.choice(web_set)

                log_entry = {
                        'userName': random.choice(user_set),
                        'visitURL': url,
                        'ts': datetime.now().strftime("%m/%d/%Y %H:%M:%S")
                }

                print(json.dumps(log_entry))
                time.sleep(0.05)

if __name__ == "__main__":
    main()

Pipeline do Kafka tématu

sshuser@hn0-contsk:~$ python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events

Další příkazy:

-- create topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic click_events --bootstrap-server wn0-contsk:9092

-- delete topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete  --topic click_events --bootstrap-server wn0-contsk:9092

-- consume topic
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic click_events --from-beginning
{"userName": "Luke", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Tom", "visitURL": "https://stackoverflow.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Chunck", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Chunck", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Andrew", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Pick", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Mike", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Zheng Hu", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Luke", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:44"}

Podrobné pokyny jsou uvedeny o tom, jak používat Secure Shell pro klienta Flink SQL.

Stažení konektoru Kafka SQL & závislostí do SSH

V následujícím kroku používáme Kafka 3.2.0 závislostí, musíte příkaz aktualizovat na základě vaší verze Kafka v clusteru HDInsight.

wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

Teď se připojíme k klientovi Flink SQL pomocí souborů JAR klienta Kafka SQL.

msdata@pod-0 [ /opt/flink-webssh ]$ bin/sql-client.sh -j flink-connector-kafka-1.17.0.jar -j kafka-clients-3.2.0.jar

Vytvoříme tabulku Kafka v Flink SQL a vybereme tabulku Kafka v Flink SQL.

V následujícím fragmentu kódu musíte aktualizovat IP adresy serveru Kafka bootstrap.

CREATE TABLE KafkaTable (
`userName` STRING,
`visitURL` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'click_events',
'properties.bootstrap.servers' = '<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092',
'properties.group.id' = 'my_group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);

select * from KafkaTable;

Snímek obrazovky znázorňující, jak vytvořit a vybrat tabulku Kafka v Flink SQL

Vytváření zpráv Kafka

Teď vytvoříme zprávy Kafka do stejného tématu pomocí HDInsight Kafka.

python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events

Tabulku můžete monitorovat v Flink SQL.

snímek obrazovky znázorňující, jak monitorovat datum tabulky v Flink SQL

Tady jsou úlohy streamování ve webovém uživatelském rozhraní Flink.

Snímek obrazovky zobrazující úlohy ve webovém uživatelském rozhraní Flink

Odkaz