다음을 통해 공유


AKS의 HDInsight에서 Apache Flink®와 Hive 카탈로그를 사용하는 방법

중요하다

AKS의 Azure HDInsight는 2025년 1월 31일에 사용 중지되었습니다. 이 에 대한공지를 자세히 알아보세요.

워크로드가 갑자기 종료되는 것을 방지하기 위해 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 워크로드를 마이그레이션해야 합니다.

중요하다

이 기능은 현재 미리 보기로 제공됩니다. Microsoft Azure 미리 보기의 추가 사용 약관에는 아직 일반 공급으로 배포되지 않은 베타, 미리 보기 Azure 기능에 적용되는 더 많은 법적 조건이 포함되어 있습니다. 이 특정 미리 보기에 대한 자세한 내용은 Azure HDInsight on AKS 미리 보기 정보 을 참조하세요. 질문이나 기능 제안이 있으시면 AskHDInsight에 요청을 제출해 주시고, 더 많은 업데이트를 원하시면 Azure HDInsight Community를 팔로우해 주세요.

이 예제에서는 Apache Flink의 Hive 카탈로그를 사용하여 Hive의 Metastore를 영구 카탈로그로 사용합니다. 이 기능은 세션 간에 Flink에 Kafka 테이블 및 MySQL 테이블 메타데이터를 저장하는 데 사용합니다. Flink는 Hive 카탈로그에 등록된 Kafka 테이블을 원본으로 사용하고 일부 조회 및 싱크 결과를 MySQL 데이터베이스에 수행합니다.

필수 구성 요소

  • Apache Flink 클러스터를 AKS의 HDInsight에서 Hive Metastore 3.1.2와 함께 사용하기
  • Apache Kafka 클러스터 HDInsight
    • Kafka 사용에서 설명한 대로 네트워크 설정이 완료되었는지 확인해야 합니다. 이는 AKS의 HDInsight와 HDInsight 클러스터가 동일한 VNet에 있는지 확인하기 위한 것입니다.
  • MySQL 8.0.33

Flink는 Hive와 2배 통합을 제공합니다.

  • 첫 번째 단계는 Flink의 HiveCatalog를 사용하여 세션 간에 Flink 특정 메타데이터를 저장하기 위해 HMS(Hive Metastore)를 영구 카탈로그로 사용하는 것입니다.
    • 예를 들어 사용자는 HiveCatalog를 사용하여 Hive Metastore에 Kafka 또는 ElasticSearch 테이블을 저장하고 나중에 SQL 쿼리에서 다시 사용할 수 있습니다.
  • 두 번째는 Hive 테이블을 읽고 쓰기 위한 대체 엔진으로 Flink를 제공하는 것입니다.
  • HiveCatalog는 기존 Hive 설치와 별도의 설정 없이 즉시 사용할 수 있도록 설계되었습니다. 기존 Hive Metastore를 수정하거나 테이블의 데이터 배치 또는 분할을 변경할 필요가 없습니다.

자세한 내용은 apache Hive 참조하세요.

환경 준비

Azure Portal에서 HMS를 사용하여 Apache Flink 클러스터를 만들 수 있습니다. Flink 클러스터 만들기대한 자세한 지침을 참조할 수 있습니다.

Flink 클러스터를 만드는 방법을 보여 주는 스크린샷

클러스터를 만든 후 AKS 쪽에서 HMS가 실행 중인지 여부를 확인합니다.

Flink 클러스터에서 HMS 상태를 확인하는 방법을 보여 주는 스크린샷

HDInsight에서 사용자 주문 트랜잭션 데이터 Kafka 항목 준비

다음 명령을 사용하여 kafka 클라이언트 jar를 다운로드합니다.

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

tar 파일을 압축 해제합니다.

tar -xvf kafka_2.12-3.2.0.tgz

Kafka 토픽에 메시지를 생성합니다.

Kafka 토픽에 메시지를 생성하는 방법을 보여 주는 스크린샷

기타 명령:

메모

부트스트랩 서버를 고유한 kafka broker 호스트 이름 또는 IP로 바꾸어야 합니다.

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Azure의 MySQL에서 사용자 주문 마스터 데이터 준비

DB 테스트:

Kafka에서 데이터베이스를 테스트하는 방법을 보여 주는 스크린샷

포털에서 Cloud Shell을 실행하는 방법을 보여 주는 스크린샷

주문 테이블 준비:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

SSH를 사용하여 필수적인 Kafka 커넥터 및 MySQL 데이터베이스 jar 파일을 다운로드하십시오.

메모

HDInsight kafka 버전 및 MySQL 버전에 따라 올바른 버전 jar을 다운로드합니다.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

플래너 항아리 이동

jar 파일 flink-table-planner_2.12-1.17.0-....jar를 webssh Pod의 /opt에서 /lib로 이동하고, /lib에 있는 jar 파일 flink-table-planner-loader-1.17.0-....jar를 /opt/flink-webssh/opt/로 이동합니다. 자세한 내용은 문제 참조하세요. Planner jar를 이동하려면 다음 단계를 수행합니다.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

메모

Hive 언어 또는 HiveServer2 엔드포인트를 사용하는 경우에만 추가 Planner jar의 이동이 필요합니다. 그러나 Hive 통합에 권장되는 설정입니다.

유효성 검사

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

메모

이미 Hive Metastore에서 Flink 클러스터를 사용하므로 추가 구성을 수행할 필요가 없습니다.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Kafka 테이블을 만드는 방법을 보여 주는 스크린샷

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

mysql 테이블을 만드는 방법을 보여 주는 스크린샷

테이블 출력을 보여 주는 스크린샷

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

사용자 트랜잭션을 싱크하는 방법을 보여 주는 스크린샷

Flink UI를 보여 주는 스크린샷

Kafka의 사용자 트랜잭션 주문 데이터가 Azure Cloud Shell의 MySQL에서 마스터 테이블 순서로 추가되었는지 확인합니다.

사용자 트랜잭션을 확인하는 방법을 보여 주는 스크린샷

Kafka에서 세 가지 사용자 주문 만들기

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Kafka 테이블 데이터를 확인하는 방법을 보여 주는 스크린샷

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

주문 테이블을 확인하는 방법을 보여 주는 스크린샷

Azure Cloud Shell의 MySQL에서 주문 테이블에 product_id = 104 레코드가 추가되었는지 확인하십시오.

주문 테이블에 추가된 레코드를 보여 주는 스크린샷

참조