Condividi tramite


Come usare il catalogo Hive con Apache Flink® su HDInsight su AKS

Importante

Azure HDInsight su AKS è stato ritirato il 31 gennaio 2025. Scopri di più tramite questo annuncio.

È necessario eseguire la migrazione dei carichi di lavoro a Microsoft Fabric o a un prodotto Azure equivalente per evitare la chiusura brusca dei carichi di lavoro.

Importante

Questa funzionalità è attualmente in anteprima. Le condizioni supplementari per l'utilizzo per le anteprime di Microsoft Azure includono termini legali più validi applicabili alle funzionalità di Azure in versione beta, in anteprima o altrimenti non ancora rilasciate nella disponibilità generale. Per informazioni su questa specifica anteprima, vedere le informazioni sull'anteprima di Azure HDInsight su AKS. Per domande o suggerimenti sulle funzionalità, inviate una richiesta su AskHDInsight con i dettagli e seguiteci per ulteriori aggiornamenti sulla Azure HDInsight Community.

Questo esempio usa il Metastore di Hive come catalogo permanente con il catalogo Hive di Apache Flink. Questa funzionalità viene usata per archiviare i metadati della tabella Kafka e della tabella MySQL in Flink tra le sessioni. Flink utilizza la tabella Kafka registrata nel catalogo Hive come fonte, esegue alcune operazioni di lookup e invia i risultati al database MySQL.

Prerequisiti

Flink offre un'integrazione bidirezionale con Hive.

  • Il primo passaggio consiste nell'usare Hive Metastore (HMS) come catalogo permanente con HiveCatalog di Flink per archiviare metadati specifici Flink tra le sessioni.
    • Ad esempio, gli utenti possono archiviare le tabelle Kafka o ElasticSearch in Metastore Hive usando HiveCatalog e riutilizzarle in un secondo momento nelle query SQL.
  • Il secondo consiste nell'offrire Flink come motore alternativo per la lettura e la scrittura di tabelle Hive.
  • HiveCatalog è progettato per essere compatibile con le installazioni Hive esistenti. Non è necessario modificare il metastore Hive esistente o modificare il posizionamento o il partizionamento dei dati delle tabelle.

Per altre informazioni, vedere Apache Hive

Preparazione dell'ambiente

Creiamo un cluster Apache Flink con HMS sul portale Azure, ed è possibile fare riferimento alle istruzioni dettagliate sulla creazione del cluster Flink.

Screenshot che mostra come creare un cluster Flink.

Dopo la creazione del cluster, verificare se HMS è in esecuzione o meno sul lato di AKS.

Screenshot che mostra come controllare lo stato di HMS nel cluster Flink.

Preparare il topic Kafka per i dati di transazione degli ordini degli utenti in HDInsight

Scaricare il file JAR del client kafka usando il comando seguente:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

Estrarre il file tar con

tar -xvf kafka_2.12-3.2.0.tgz

Produci i messaggi nel topic Kafka.

Screenshot che mostra come produrre messaggi all'argomento Kafka.

Altri comandi:

Nota

È necessario sostituire bootstrap-server con il nome host o l'indirizzo IP dei propri broker di Kafka.

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Preparare i dati principali degli ordini utente in MySQL su Azure

Test del Database:

Screenshot che mostra come testare il database in Kafka.

Screenshot che mostra come eseguire Cloud Shell nel portale.

Preparare la tabella degli ordini:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

Usando SSH, scarica i connettori necessari di Kafka e i JAR del database MySQL.

Nota

Scaricare il file JAR della versione corretta in base alla versione kafka di HDInsight e alla versione di MySQL.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

Spostamento del vaso del planner

Spostare il file jar flink-table-planner_2.12-1.17.0-....jar disponibile nel pod webssh /opt to /lib e spostare il file jar flink-table-planner-loader1.17.0-....jar /opt/flink-webssh/opt/ da /lib. Per ulteriori dettagli, fare riferimento alla questione . Per spostare il file JAR di Planner, seguire questa procedura.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

Nota

Lo spostamento di un file JAR aggiuntivo di Planner è necessario solo quando si usa il dialetto Hive o l'endpoint HiveServer2. Tuttavia, si tratta della configurazione consigliata per l'integrazione di Hive.

Convalida

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

Nota

Poiché si usa già il cluster Flink con Il metastore Hive, non è necessario eseguire configurazioni aggiuntive.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Screenshot che mostra come creare una tabella Kafka.

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

Screenshot che mostra come creare la tabella mysql.

Screenshot che mostra l'output della tabella.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

Screenshot che mostra come annullare la transazione utente.

Screenshot che mostra l'interfaccia utente Flink.

Controllare se i dati relativi all'ordine delle transazioni utente su Kafka sono stati aggiunti nella tabella principale di MySQL su Azure Cloud Shell.

Screenshot che mostra come controllare la transazione utente.

Creazione di altri tre ordini utente in Kafka

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Screenshot che mostra come controllare i dati della tabella Kafka.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

Screenshot che mostra come controllare la tabella degli ordini.

Controlla che il record product_id = 104 sia stato aggiunto nella tabella degli ordini su MySQL su Azure Cloud Shell.

Screenshot che mostra i record aggiunti alla tabella degli ordini.

Riferimento