Come usare Hive Catalog con Apache Flink® in HDinsight su AKS
Nota
Azure HDInsight su AKS verrà ritirato il 31 gennaio 2025. Prima del 31 gennaio 2025, sarà necessario eseguire la migrazione dei carichi di lavoro a Microsoft Fabric o a un prodotto Azure equivalente per evitare interruzioni improvvise dei carichi di lavoro. I cluster rimanenti nella sottoscrizione verranno arrestati e rimossi dall’host.
Solo il supporto di base sarà disponibile fino alla data di ritiro.
Importante
Questa funzionalità è attualmente disponibile solo in anteprima. Le Condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure includono termini legali aggiuntivi che si applicano a funzionalità di Azure in versione beta, in anteprima o in altro modo non ancora disponibili a livello generale. Per informazioni su questa anteprima specifica, vedere Informazioni sull'anteprima di Azure HDInsight nel servizio Azure Kubernetes. Per domande o suggerimenti sulle funzionalità, inviare una richiesta in AskHDInsight con i dettagli e seguire Microsoft per altri aggiornamenti nella Community di Azure HDInsight.
Questo esempio usa il metastore di Hive come catalogo permanente con Hive Catalog di Apache Flink’. Questa funzionalità viene usata per archiviare i metadati della tabella Kafka e della tabella MySQL in Flink tra una sessione e l'altra. Flink usa la tabella Kafka registrata in Hive CAtalog come origine, esegue alcune ricerche ed esegue quindi il sink dei risultati nel database MySQL
Prerequisiti
- Cluster Apache Flink in HDInsight su AKS con Metastore Hive 3.1.2
- Cluster Apache Kafka in HDInsight
- È necessario assicurarsi che le impostazioni di rete siano state configurate come descritto in Uso di Kafka per assicurarsi che HDInsight nei cluster del servizio Azure Kubernetes e HDInsight si trovino nella stessa rete virtuale
- MySQL 8.0.33
Apache Hive in Apache Flink
Flink offre un'integrazione bidirezionale con Hive.
- Il primo passaggio consiste nell'usare Hive Metastore (HMS) come catalogo permanente con HiveCatalog di Flink per l'archiviazione di metadati specifici Flink tra una sessione e l'altra.
- Ad esempio, gli utenti possono archiviare le tabelle Kafka o ElasticSearch in Metastore Hive usando HiveCatalog e riutilizzarle in un secondo momento nelle query SQL.
- Il secondo consiste nell'offrire Flink come motore alternativo per la lettura e la scrittura di tabelle Hive.
- HiveCatalog è progettato per essere “compatibile” con le installazioni Hive esistenti. Non è necessario modificare il metastore Hive esistente o modificare il posizionamento o il partizionamento dei dati delle tabelle.
Per altre informazioni, vedere Apache Hive
Preparazione dell'ambiente
Creare un cluster Apache Flink con HMS
Consente di creare un cluster Apache Flink con HMS nel portale di Azure, è possibile fare riferimento alle istruzioni dettagliate in Creazione del cluster Flink.
Dopo la creazione del cluster, verificare che HMS sia in esecuzione o meno sul lato servizio Azure Kubernetes.
Preparare l'argomento Kafka sui dati delle transazioni degli ordini utente in HDInsight
Scaricare il file JAR del client Kafka usando il comando seguente:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
Decomprimere il file TAR con
tar -xvf kafka_2.12-3.2.0.tgz
Generare i messaggi per l'argomento Kafka.
Altri comandi:
Nota
È necessario sostituire bootstrap-server con il proprio nome host o IP dei broker Kafka
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Preparare i dati master degli ordini utente in MySQL in Azure
Test di DB:
Preparare la tabella degli ordini:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
Uso del download SSH necessario per il connettore Kafka e i file JAR del database MySQL
Nota
Scaricare il file JAR della versione corretta in base alla versione Kafka di HDInsight e alla versione di MySQL.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Spostamento del file JAR di pianificazione
Spostare il file jar flink-table-planner_2.12-1.17.0-....jar disponibile nel pod webssh /opt in /lib e spostare il file jar flink-table-planner-loader1.17.0-....jar /opt/flink-webssh/opt/ da /lib. Per altri dettagli, vedere il problema. Per spostare il file JAR di pianificazione, seguire questa procedura.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Nota
Lo spostamento di un file JAR aggiuntivo di pianificazione è necessario solo quando si usa il dialetto Hive o l'endpoint HiveServer2. Tuttavia, si tratta della configurazione consigliata per l'integrazione di Hive.
Convalida
Usare bin/sql-client.sh per connettersi a Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Creare il catalogo Hive e connettersi a esso in Flink SQL
Nota
Poiché si usa già il cluster Flink con il metastore Hive, non è necessario eseguire configurazioni aggiuntive.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Creare una tabella Kafka in Apache Flink SQL
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
Creare una tabella MySQL in Apache Flink SQL
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Controllare le tabelle registrate in precedenza nel catalogo Hive in Flink SQL
Eseguire il sink delle informazioni dell'ordine di transazione utente in una tabella di ordini primaria in MySQL su Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Controllare se i dati dell'ordine delle transazioni utente in Kafka sono stati aggiunti nell'ordine della tabella principale in MySQL in Azure Cloud Shell
Creazione di altri tre ordini utente in Kafka
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Controllare i dati della tabella Kafka in Flink SQL
Flink SQL> select * from kafka_user_orders;
Inserire product_id=104
nella tabella degli ordini in MySQL in Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
Controllare che il record product_id = 104
sia stato aggiunto nella tabella degli ordini in MySQL in Azure Cloud Shell
Riferimento
- Apache Hive
- Apache, Apache Hive, Hive, Apache Flink, Flink e i nomi dei progetti open source associati sono marchi di Apache Software Foundation (ASF).