Tworzenie tabeli Apache Kafka® na Apache Flink® w HDInsight na AKS
Ważny
Usługa Azure HDInsight w usłudze AKS została wycofana 31 stycznia 2025 r. Dowiedz się więcej dzięki temu ogłoszeniu .
Aby uniknąć nagłego kończenia obciążeń, należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure.
Ważny
Ta funkcja jest obecnie dostępna w wersji zapoznawczej. Dodatkowe warunki użytkowania platformy Microsoft Azure zawierają więcej warunków prawnych, które dotyczą funkcji platformy Azure będących w wersji beta, zapoznawczej lub które jeszcze nie są ogólnodostępne. Aby uzyskać informacje na temat tej konkretnej wersji zapoznawczej, zobacz informacje o wersji zapoznawczej Azure HDInsight na AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie na AskHDInsight z szczegółami i obserwuj nas po więcej aktualizacji na Azure HDInsight Community.
Korzystając z tego przykładu, dowiedz się, jak utworzyć tabelę platformy Kafka w bazie danych Apache FlinkSQL.
Warunki wstępne
Łącznik SQL dla Apache Kafka w Apache Flink
Łącznik Kafka umożliwia odczytywanie danych z tematów Kafka i zapisywanie ich do tych tematów. Aby uzyskać więcej informacji, zobacz Apache Kafka SQL Connector.
Tworzenie tabeli platformy Kafka w języku Flink SQL
Przygotowanie tematu i danych w HDInsight Kafka
przygotowywanie komunikatów przy użyciu weblog.py
import random
import json
import time
from datetime import datetime
user_set = [
'John',
'XiaoMing',
'Mike',
'Tom',
'Machael',
'Zheng Hu',
'Zark',
'Tim',
'Andrew',
'Pick',
'Sean',
'Luke',
'Chunck'
]
web_set = [
'https://google.com',
'https://facebook.com?id=1',
'https://tmall.com',
'https://baidu.com',
'https://taobao.com',
'https://aliyun.com',
'https://apache.com',
'https://flink.apache.com',
'https://hbase.apache.com',
'https://github.com',
'https://gmail.com',
'https://stackoverflow.com',
'https://python.org'
]
def main():
while True:
if random.randrange(10) < 4:
url = random.choice(web_set[:3])
else:
url = random.choice(web_set)
log_entry = {
'userName': random.choice(user_set),
'visitURL': url,
'ts': datetime.now().strftime("%m/%d/%Y %H:%M:%S")
}
print(json.dumps(log_entry))
time.sleep(0.05)
if __name__ == "__main__":
main()
ścieżka do tematu Kafka
sshuser@hn0-contsk:~$ python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events
Inne polecenia:
-- create topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic click_events --bootstrap-server wn0-contsk:9092
-- delete topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic click_events --bootstrap-server wn0-contsk:9092
-- consume topic
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic click_events --from-beginning
{"userName": "Luke", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Tom", "visitURL": "https://stackoverflow.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Chunck", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Chunck", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Andrew", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Pick", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Mike", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Zheng Hu", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Luke", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:44"}
Klient SQL Apache Flink
Szczegółowe instrukcje są przedstawione na temat sposobu używania protokołu Secure Shell dla klienta Flink SQL.
Pobierz zależności łącznika SQL dla Kafka & na SSH
W poniższym kroku używamy zależności platformy Kafka 3.2.0. Musisz zaktualizować polecenie na podstawie wersji platformy Kafka w klastrze usługi HDInsight.
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Nawiązywanie połączenia z klientem SQL apache Flink
Teraz nawiążmy połączenie z klientem SQL Flink przy użyciu plików JAR klienta SQL platformy Kafka.
msdata@pod-0 [ /opt/flink-webssh ]$ bin/sql-client.sh -j flink-connector-kafka-1.17.0.jar -j kafka-clients-3.2.0.jar
Tworzenie tabeli Kafka w Apache Flink SQL
Utwórzmy tabelę Kafka w języku Flink SQL i wybierzmy tabelę Kafka w języku Flink SQL.
Musisz zaktualizować adresy IP serwera bootstrap platformy Kafka w poniższym fragmencie kodu.
CREATE TABLE KafkaTable (
`userName` STRING,
`visitURL` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'click_events',
'properties.bootstrap.servers' = '<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092',
'properties.group.id' = 'my_group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
select * from KafkaTable;
Tworzenie komunikatów platformy Kafka
Teraz utwórzmy komunikaty platformy Kafka do tego samego tematu przy użyciu platformy Kafka w usłudze HDInsight.
python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events
Tabela w języku Apache Flink SQL
Tabelę można monitorować w języku Flink SQL.
Poniżej przedstawiono zadania przesyłania strumieniowego w interfejsie użytkownika sieci Web Flink.
Odniesienie
- Apache Kafka łącznik SQL
- Nazwy projektów Apache, Apache Kafka, Apache Flink, Flink oraz powiązane z nimi nazwy projektów open source są znakami towarowymi należącymi do Apache Software Foundation (ASF).