Создание таблицы Apache Kafka® на Apache Flink® в HDInsight на AKS
Важный
Azure HDInsight на AKS прекращено 31 января 2025 г. Узнайте больше о из этого объявления.
Необходимо перенести рабочие нагрузки в Microsoft Fabric или эквивалентный продукт Azure, чтобы избежать резкого завершения рабочих нагрузок.
Важный
Эта функция сейчас доступна в предварительной версии. Дополнительные условия использования для предварительных версий Microsoft Azure включают дополнительные юридические термины, применимые к функциям Azure, которые находятся в бета-версии, в предварительной версии или в противном случае еще не выпущены в общую доступность. Сведения об этой конкретной предварительной версии см. в Azure HDInsight в предварительной версии AKS. Для вопросов или предложений функций отправьте запрос на AskHDInsight с подробными сведениями и следуйте за дополнительными обновлениями в Azure HDInsight Community.
В этом примере вы узнаете, как создать таблицу Kafka в Apache FlinkSQL.
Необходимые условия
Соединитель Kafka SQL для Apache Flink
Соединитель Kafka позволяет считывать данные из и записывать данные в разделы Kafka. Дополнительную информацию см. в коннекторе Apache Kafka SQL.
Создание таблицы Kafka в Flink SQL
Подготовить топик и данные в HDInsight Kafka
Подготовка сообщений с помощью weblog.py
import random
import json
import time
from datetime import datetime
user_set = [
'John',
'XiaoMing',
'Mike',
'Tom',
'Machael',
'Zheng Hu',
'Zark',
'Tim',
'Andrew',
'Pick',
'Sean',
'Luke',
'Chunck'
]
web_set = [
'https://google.com',
'https://facebook.com?id=1',
'https://tmall.com',
'https://baidu.com',
'https://taobao.com',
'https://aliyun.com',
'https://apache.com',
'https://flink.apache.com',
'https://hbase.apache.com',
'https://github.com',
'https://gmail.com',
'https://stackoverflow.com',
'https://python.org'
]
def main():
while True:
if random.randrange(10) < 4:
url = random.choice(web_set[:3])
else:
url = random.choice(web_set)
log_entry = {
'userName': random.choice(user_set),
'visitURL': url,
'ts': datetime.now().strftime("%m/%d/%Y %H:%M:%S")
}
print(json.dumps(log_entry))
time.sleep(0.05)
if __name__ == "__main__":
main()
Конвейер данных на тему Kafka
sshuser@hn0-contsk:~$ python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events
Другие команды:
-- create topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic click_events --bootstrap-server wn0-contsk:9092
-- delete topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic click_events --bootstrap-server wn0-contsk:9092
-- consume topic
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic click_events --from-beginning
{"userName": "Luke", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Tom", "visitURL": "https://stackoverflow.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Chunck", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Chunck", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Andrew", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Pick", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Mike", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Zheng Hu", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Luke", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:44"}
Клиент Apache Flink SQL
Инструкции о том, как использовать Secure Shell для клиента Flink SQL , приведены подробно.
Скачайте зависимости Kafka SQL коннектора & через SSH.
Мы используем зависимости Kafka 3.2.0 в следующем шаге. Вы должны обновить команду в соответствии с вашей версией Kafka в кластере HDInsight.
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Подключение к клиенту Apache Flink SQL
Теперь давайте подключимся к клиенту Flink SQL с помощью jar-файлов клиента Kafka SQL.
msdata@pod-0 [ /opt/flink-webssh ]$ bin/sql-client.sh -j flink-connector-kafka-1.17.0.jar -j kafka-clients-3.2.0.jar
Создание таблицы Kafka в Apache Flink SQL
Давайте создадим таблицу Kafka на Flink SQL и выберите таблицу Kafka в Flink SQL.
Вам потребуется обновить IP-адреса сервера начальной загрузки Kafka в приведенном ниже фрагменте кода.
CREATE TABLE KafkaTable (
`userName` STRING,
`visitURL` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'click_events',
'properties.bootstrap.servers' = '<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092',
'properties.group.id' = 'my_group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
select * from KafkaTable;
Создание сообщений Kafka
Теперь давайте создадим сообщения Kafka в том же разделе, используя HDInsight Kafka.
python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events
Таблица в Apache Flink SQL
Вы можете отслеживать таблицу в Flink SQL.
Ниже приведены задания потоковой передачи в веб-интерфейсе Flink.
Ссылка
- SQL-Коннектор для Apache Kafka
- Apache, Apache Kafka, Kafka, Apache Flink, Flink и связанные имена открытых проектов являются товарными знакамиApache Software Foundation (ASF).