AKS의 HDInsight에서 Apache Flink®와 Hive 카탈로그를 사용하는 방법
중요하다
AKS의 Azure HDInsight는 2025년 1월 31일에 사용 중지되었습니다. 이 에 대한공지를 자세히 알아보세요.
워크로드가 갑자기 종료되는 것을 방지하기 위해 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 워크로드를 마이그레이션해야 합니다.
중요하다
이 기능은 현재 미리 보기로 제공됩니다. Microsoft Azure 미리 보기의 추가 사용 약관에는 아직 일반 공급으로 배포되지 않은 베타, 미리 보기 Azure 기능에 적용되는 더 많은 법적 조건이 포함되어 있습니다. 이 특정 미리 보기에 대한 자세한 내용은 Azure HDInsight on AKS 미리 보기 정보 을 참조하세요. 질문이나 기능 제안이 있으시면 AskHDInsight에 요청을 제출해 주시고, 더 많은 업데이트를 원하시면 Azure HDInsight Community를 팔로우해 주세요.
이 예제에서는 Apache Flink의 Hive 카탈로그를 사용하여 Hive의 Metastore를 영구 카탈로그로 사용합니다. 이 기능은 세션 간에 Flink에 Kafka 테이블 및 MySQL 테이블 메타데이터를 저장하는 데 사용합니다. Flink는 Hive 카탈로그에 등록된 Kafka 테이블을 원본으로 사용하고 일부 조회 및 싱크 결과를 MySQL 데이터베이스에 수행합니다.
필수 구성 요소
- Apache Flink 클러스터를 AKS의 HDInsight에서 Hive Metastore 3.1.2와 함께 사용하기
- Apache Kafka 클러스터 HDInsight
- Kafka 사용에서 설명한 대로 네트워크 설정이 완료되었는지 확인해야 합니다. 이는 AKS의 HDInsight와 HDInsight 클러스터가 동일한 VNet에 있는지 확인하기 위한 것입니다.
- MySQL 8.0.33
Apache Flink에서 Apache Hive 운영
Flink는 Hive와 2배 통합을 제공합니다.
- 첫 번째 단계는 Flink의 HiveCatalog를 사용하여 세션 간에 Flink 특정 메타데이터를 저장하기 위해 HMS(Hive Metastore)를 영구 카탈로그로 사용하는 것입니다.
- 예를 들어 사용자는 HiveCatalog를 사용하여 Hive Metastore에 Kafka 또는 ElasticSearch 테이블을 저장하고 나중에 SQL 쿼리에서 다시 사용할 수 있습니다.
- 두 번째는 Hive 테이블을 읽고 쓰기 위한 대체 엔진으로 Flink를 제공하는 것입니다.
- HiveCatalog는 기존 Hive 설치와 별도의 설정 없이 즉시 사용할 수 있도록 설계되었습니다. 기존 Hive Metastore를 수정하거나 테이블의 데이터 배치 또는 분할을 변경할 필요가 없습니다.
환경 준비
HMS를 사용하여 Apache Flink 클러스터 만들기
Azure Portal에서 HMS를 사용하여 Apache Flink 클러스터를 만들 수 있습니다. Flink 클러스터 만들기대한 자세한 지침을 참조할 수 있습니다.
클러스터를 만든 후 AKS 쪽에서 HMS가 실행 중인지 여부를 확인합니다.
HDInsight에서 사용자 주문 트랜잭션 데이터 Kafka 항목 준비
다음 명령을 사용하여 kafka 클라이언트 jar를 다운로드합니다.
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
tar 파일을 압축 해제합니다.
tar -xvf kafka_2.12-3.2.0.tgz
Kafka 토픽에 메시지를 생성합니다.
기타 명령:
메모
부트스트랩 서버를 고유한 kafka broker 호스트 이름 또는 IP로 바꾸어야 합니다.
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Azure의 MySQL에서 사용자 주문 마스터 데이터 준비
DB 테스트:
주문 테이블 준비:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
SSH를 사용하여 필수적인 Kafka 커넥터 및 MySQL 데이터베이스 jar 파일을 다운로드하십시오.
메모
HDInsight kafka 버전 및 MySQL 버전에 따라 올바른 버전 jar을 다운로드합니다.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
플래너 항아리 이동
jar 파일 flink-table-planner_2.12-1.17.0-....jar를 webssh Pod의 /opt에서 /lib로 이동하고, /lib에 있는 jar 파일 flink-table-planner-loader-1.17.0-....jar를 /opt/flink-webssh/opt/로 이동합니다. 자세한 내용은 문제 참조하세요. Planner jar를 이동하려면 다음 단계를 수행합니다.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
메모
Hive 언어 또는 HiveServer2 엔드포인트를 사용하는 경우에만 추가 Planner jar의 이동이 필요합니다. 그러나 Hive 통합에 권장되는 설정입니다.
유효성 검사
bin/sql-client.sh 사용하여 Flink SQL에 연결
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Hive 카탈로그를 만들고 Flink SQL에서 Hive 카탈로그에 연결
메모
이미 Hive Metastore에서 Flink 클러스터를 사용하므로 추가 구성을 수행할 필요가 없습니다.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Apache Flink SQL에서 Kafka 테이블 만들기
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
Apache Flink SQL에서 MySQL 테이블 만들기
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Flink SQL에서 위의 Hive 카탈로그에 등록된 테이블 확인
Flink SQL의 MySQL에서 마스터 주문 테이블에 사용자 트랜잭션 주문 정보 싱크
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Kafka의 사용자 트랜잭션 주문 데이터가 Azure Cloud Shell의 MySQL에서 마스터 테이블 순서로 추가되었는지 확인합니다.
Kafka에서 세 가지 사용자 주문 만들기
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL에서 Kafka 테이블 데이터 확인
Flink SQL> select * from kafka_user_orders;
Flink SQL의 MySQL에서 orders 테이블에 product_id=104
삽입
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
Azure Cloud Shell의 MySQL에서 주문 테이블에 product_id = 104
레코드가 추가되었는지 확인하십시오.
참조
- Apache Hive
- Apache, Apache Hive, Hive, Apache Flink, Flink 및 관련 오픈 소스 프로젝트 이름은 Apache Software Foundation(ASF)의 상표입니다.