AKS의 HDInsight에서 Apache Flink®에 Apache Kafka® 테이블 만들기
중요하다
AKS의 Azure HDInsight는 2025년 1월 31일에 사용 중지되었습니다. 에 대해 더 알아보려면 이 공지을 확인하세요.
워크로드가 갑자기 종료되는 것을 방지하기 위해 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 워크로드를 마이그레이션해야 합니다.
중요하다
이 기능은 현재 미리 보기로 제공됩니다. Microsoft Azure 프리뷰에 대한 추가 사용 약관은 베타, 미리 보기 또는 아직 일반 공급으로 릴리스되지 않은 Azure 기능에 적용되는 더 많은 법적 조건을 포함합니다. 이 특정 미리 보기에 대한 자세한 내용은 Azure HDInsight on AKS 미리 보기 정보 참조하세요. 질문 또는 기능 제안에 대한 자세한 내용은 AskHDInsight 대한 요청을 제출하고 Azure HDInsight Community 대한 자세한 업데이트를.
이 예제를 사용하여 Apache FlinkSQL에서 Kafka 테이블을 만드는 방법을 알아봅니다.
필수 구성 요소
- HDInsight Apache Kafka 클러스터
- AKS의 HDInsight에서 Apache Flink 클러스터
Apache Flink의 Kafka SQL 커넥터
Kafka 커넥터를 사용하면 Kafka 토픽에서 데이터를 읽고 데이터를 쓸 수 있습니다. 자세한 내용은 apache Kafka SQL Connector 참조하세요.
Flink SQL에서 Kafka 테이블 만들기
HDInsight Kafka에서 토픽 및 데이터 준비
weblog.py 사용하여 메시지 준비
import random
import json
import time
from datetime import datetime
user_set = [
'John',
'XiaoMing',
'Mike',
'Tom',
'Machael',
'Zheng Hu',
'Zark',
'Tim',
'Andrew',
'Pick',
'Sean',
'Luke',
'Chunck'
]
web_set = [
'https://google.com',
'https://facebook.com?id=1',
'https://tmall.com',
'https://baidu.com',
'https://taobao.com',
'https://aliyun.com',
'https://apache.com',
'https://flink.apache.com',
'https://hbase.apache.com',
'https://github.com',
'https://gmail.com',
'https://stackoverflow.com',
'https://python.org'
]
def main():
while True:
if random.randrange(10) < 4:
url = random.choice(web_set[:3])
else:
url = random.choice(web_set)
log_entry = {
'userName': random.choice(user_set),
'visitURL': url,
'ts': datetime.now().strftime("%m/%d/%Y %H:%M:%S")
}
print(json.dumps(log_entry))
time.sleep(0.05)
if __name__ == "__main__":
main()
파이프라인에서 Kafka 토픽으로
sshuser@hn0-contsk:~$ python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events
기타 명령:
-- create topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic click_events --bootstrap-server wn0-contsk:9092
-- delete topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic click_events --bootstrap-server wn0-contsk:9092
-- consume topic
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic click_events --from-beginning
{"userName": "Luke", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Tom", "visitURL": "https://stackoverflow.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Chunck", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Chunck", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Andrew", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Pick", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Mike", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Zheng Hu", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Luke", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:44"}
Apache Flink SQL 클라이언트
Flink SQL 클라이언트 Secure Shell을 사용하는 방법에 대한 자세한 지침이 제공됩니다.
Kafka SQL Connector & 종속성을 SSH에 다운로드
아래 단계에서 Kafka 3.2.0 종속성을 사용하고 있습니다. HDInsight 클러스터의 Kafka 버전에 따라 명령을 업데이트해야 합니다.
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Apache Flink SQL 클라이언트에 연결
이제 Kafka SQL 클라이언트 jar를 사용하여 Flink SQL 클라이언트에 연결해 보겠습니다.
msdata@pod-0 [ /opt/flink-webssh ]$ bin/sql-client.sh -j flink-connector-kafka-1.17.0.jar -j kafka-clients-3.2.0.jar
Apache Flink SQL에서 Kafka 테이블 만들기
Flink SQL에서 Kafka 테이블을 만들고 Flink SQL에서 Kafka 테이블을 선택하겠습니다.
아래 코드 조각에서 Kafka 부트스트랩 서버 IP를 업데이트해야 합니다.
CREATE TABLE KafkaTable (
`userName` STRING,
`visitURL` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'click_events',
'properties.bootstrap.servers' = '<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092',
'properties.group.id' = 'my_group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
select * from KafkaTable;
Flink SQL에서 Kafka 테이블을 만들고 선택하는 방법을 보여 주는
Kafka 메시지 생성
이제 HDInsight Kafka를 사용하여 동일한 토픽에 Kafka 메시지를 생성해 보겠습니다.
python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events
Apache Flink SQL의 테이블
Flink SQL에서 테이블을 모니터링할 수 있습니다.
Flink SQL에서 테이블 날짜를 모니터링하는 방법을 보여 주는
다음은 Flink Web UI의 스트리밍 작업입니다.
참조
- Apache Kafka SQL 커넥터
- Apache, Apache Kafka, Kafka, Apache Flink, Flink 및 관련 오픈 소스 프로젝트 이름은 Apache Software Foundation(ASF)의 상표입니다.