Partage via


Utilisation du catalogue Hive avec Apache Flink® sur HDInsight sur AKS

Important

Azure HDInsight sur AKS a été mis hors service le 31 janvier 2025. En savoir plus sur avec cette annonce.

Vous devez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent pour éviter l’arrêt brusque de vos charges de travail.

Important

Cette fonctionnalité est actuellement en préversion. Les Conditions d’utilisation supplémentaires pour les préversions Microsoft Azure incluent des termes juridiques supplémentaires qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou qui ne sont pas encore publiées en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez informations sur Azure HDInsight sur AKS en préversion. Pour des questions ou des suggestions de fonctionnalités, envoyez une demande sur AskHDInsight avec les détails et suivez-nous pour plus de mises à jour sur Communauté Azure HDInsight.

Cet exemple utilise le Metastore de Hive comme catalogue persistant avec le catalogue Hive d’Apache Flink. Nous utilisons cette fonctionnalité pour stocker la table Kafka et les métadonnées de table MySQL sur Flink entre les sessions. Flink utilise la table Kafka enregistrée dans le catalogue Hive comme source, pour effectuer une recherche et envoyer les résultats dans la base de données MySQL.

Conditions préalables

Flink offre une intégration double avec Hive.

  • La première étape consiste à utiliser Hive Metastore (HMS) comme catalogue persistant avec HiveCatalog de Flink pour stocker des métadonnées spécifiques À Flink entre les sessions.
    • Par exemple, les utilisateurs peuvent stocker leurs tables Kafka ou ElasticSearch dans le metastore Hive à l’aide de HiveCatalog et les réutiliser ultérieurement dans les requêtes SQL.
  • La deuxième consiste à offrir Flink comme moteur de remplacement pour la lecture et l’écriture de tables Hive.
  • HiveCatalog est conçu pour être compatible avec les installations Hive existantes. Vous n’avez pas besoin de modifier votre metastore Hive existant ou de modifier le placement ou le partitionnement des données de vos tables.

Pour plus d’informations, consultez apache Hive

Préparation de l’environnement

Créons un cluster Apache Flink avec HMS sur le portail Azure, en consultant les instructions détaillées sur la création de cluster Flink .

Capture d’écran montrant comment créer un cluster Flink.

Après la création du cluster, vérifiez que HMS est en cours d’exécution ou non côté AKS.

Capture d’écran montrant comment vérifier l’état HMS dans le cluster Flink.

Préparer le sujet Kafka des données de transactions de commandes des utilisateurs sur HDInsight.

Téléchargez le fichier jar du client kafka à l’aide de la commande suivante :

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

Extraire le fichier tar avec

tar -xvf kafka_2.12-3.2.0.tgz

Produisez les messages dans la rubrique Kafka.

Capture d’écran montrant comment produire des messages dans la rubrique Kafka.

Autres commandes :

Note

Vous devez remplacer bootstrap-server par votre propre nom d’hôte ou adresse IP de répartiteur kafka

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Préparer les données de référence des commandes utilisateur sur MySQL sur Azure

Base de données de test :

Capture d’écran montrant comment tester la base de données dans Kafka.

Capture d’écran montrant comment exécuter Cloud Shell sur le portail.

Préparer la table de commandes :

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

Téléchargez le connecteur Kafka requis et les fichiers JAR de la base de données MySQL en utilisant SSH.

Note

Téléchargez le fichier jar de la version appropriée en fonction de notre version kafka HDInsight et de la version MySQL.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

Déplacer le jar du planificateur

Déplacez le fichier jar flink-table-planner_2.12-1.17.0-....jar situé dans le répertoire /opt du pod webssh vers /lib, et déplacez le fichier jar flink-table-planner-loader1.17.0-....jar de /lib vers /opt/flink-webssh/opt/. Pour plus d’informations, consultez la question numéro. Pour déplacer le fichier jar du planificateur, effectuez les étapes suivantes.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

Note

Un déplacement supplémentaire d'un fichier jar de plan n’est nécessaire que lors de l’utilisation du dialecte Hive ou du point de terminaison HiveServer2. Toutefois, il s’agit de la configuration recommandée pour l’intégration de Hive.

Validation

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

Note

Comme nous utilisons déjà le cluster Flink avec le Metastore Hive, il n’est pas nécessaire d’effectuer des configurations supplémentaires.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Capture d’écran montrant comment créer une table Kafka.

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

Capture d’écran montrant comment créer une table mysql.

capture d’écran montrant la sortie du tableau.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

Capture d’écran montrant comment ignorer la transaction utilisateur.

capture d’écran montrant l’interface utilisateur Flink.

Vérifiez si les données de l’ordre des transactions utilisateur sur Kafka sont ajoutées dans l’ordre de table maître dans MySQL sur Azure Cloud Shell

Capture d’écran montrant comment vérifier la transaction utilisateur.

Création de trois commandes utilisateur supplémentaires sur Kafka

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Capture d’écran montrant comment vérifier les données de table Kafka.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

Capture d’écran illustrant la vérification du tableau des commandes.

Vérifier que l’enregistrement product_id = 104 est ajouté dans la table de commandes sur MySQL sur Azure Cloud Shell

Capture d’écran montrant les enregistrements ajoutés à la table de commandes.

Référence