Partager via


Comment utiliser un catalogue Hive avec Apache Flink® sur HDInsight sur AKS

Remarque

Nous allons mettre hors service Azure HDInsight sur AKS le 31 janvier 2025. Avant le 31 janvier 2025, vous devrez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent afin d’éviter leur arrêt brutal. Les clusters restants de votre abonnement seront arrêtés et supprimés de l’hôte.

Seul le support de base est disponible jusqu’à la date de mise hors service.

Important

Cette fonctionnalité est disponible actuellement en mode Aperçu. Les Conditions d’utilisation supplémentaires pour les préversions de Microsoft Azure contiennent davantage de conditions légales qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou ne se trouvant pas encore en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les Informations sur la préversion d’Azure HDInsight sur AKS. Pour toute question ou pour des suggestions à propos des fonctionnalités, veuillez envoyer vos requêtes et leurs détails sur AskHDInsight, et suivez-nous sur la Communauté Azure HDInsight pour plus de mises à jour.

Cet exemple utilise le Metastore de Hive comme catalogue persistant avec le catalogue Hive d’Apache Flink. Nous utilisons cette fonctionnalité pour stocker les métadonnées des tables Kafka et MySQL sur Flink au fil des sessions. Flink utilise la table Kafka enregistrée dans le catalogue Hive comme source, effectue une recherche et envoie le résultat dans la base de données MySQL

Prérequis

Flink propose une double intégration avec Hive.

  • La première étape consiste à utiliser metastore Hive (HMS) comme catalogue persistant avec HiveCatalog de Flink pour stocker les métadonnées spécifiques à Flink au fil des sessions.
    • Par exemple, les utilisateurs peuvent stocker leurs tables Kafka ou ElasticSearch dans metastore Hive à l'aide de HiveCatalog et les réutiliser ultérieurement dans des requêtes SQL.
  • La seconde consiste à proposer Flink comme moteur alternatif de lecture et d’écriture de tables Hive.
  • Le HiveCatalog est conçu pour être « prêt à l’emploi » compatible avec les installations Hive existantes. Vous n'avez pas besoin de modifier votre metastore Hive existant ni de modifier le placement des données ou le partitionnement de vos tables.

Pour plus d’informations, consultez Apache Hive

Préparation de l’environnement

Créons un cluster Apache Flink avec HMS sur le Portail Microsoft Azure, vous pouvez vous référer aux instructions détaillées sur la création de cluster Flink.

Capture d’écran montrant comment créer un cluster Flink.

Après la création du cluster, vérifiez que HMS est en cours d'exécution ou non du côté AKS.

Capture d’écran montrant comment vérifier l’état HMS dans le cluster Flink.

Préparer les données de transaction de commande utilisateur, sujet Kafka sur HDInsight

Téléchargez le fichier jar du client Kafka à l'aide de la commande suivante :

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

Décompressez le fichier tar avec

tar -xvf kafka_2.12-3.2.0.tgz

Produisez les messages sur le sujet Kafka.

Capture d’écran montrant comment produire des messages dans la rubrique Kafka.

Autres commandes :

Remarque

Vous devez remplacer le serveur d'amorçage par le nom d'hôte ou l'adresse IP de votre propre courtier Kafka

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Préparer les données principales des commandes utilisateur sur MySQL sur Azure

Base de données de test :

Capture d’écran montrant comment tester la base de données dans Kafka.

Capture d’écran montrant comment exécuter Cloud Shell sur le portail.

Préparez le tableau de commande :

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

L'utilisation du téléchargement SSH nécessite le connecteur Kafka et les fichiers jar de la base de données MySQL

Remarque

Téléchargez la version jar correcte selon notre version HDInsight kafka et notre version MySQL.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

Déplacer le pot du planificateur

Déplacez le fichier jar flink-table-planner_2.12-1.17.0-....jar situé dans le pod webssh /opt to /lib et déplacez le fichier jar flink-table-planner-loader1.17.0-....jar /opt/flink-webssh/opt/ à partir de /lib. Rapportez-vous au problème pour plus de détails. Effectuez les étapes suivantes pour déplacer le pot du planificateur.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

Remarque

Un déplacement de pot de planificateur supplémentaire n'est nécessaire que lors de l'utilisation du dialecte Hive ou du point de terminaison HiveServer2.. Cependant, il s'agit de la configuration recommandée pour l'intégration de Hive.

Validation

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

Remarque

Comme nous utilisons déjà le cluster Flink avec metastore Hive, il n'est pas nécessaire d'effectuer de configurations supplémentaires.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Capture d’écran montrant comment créer une table Kafka.

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

Capture d’écran montrant comment créer une table mysql.

Capture d’écran montrant la sortie de table.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

Capture d’écran montrant comment recevoir la transaction utilisateur.

Capture d’écran montrant l’interface utilisateur Flink.

Vérifiez si les données de commande des transactions utilisateur sur Kafka sont ajoutées dans l'ordre de la table principale dans MySQL sur Azure Cloud Shell

Capture d’écran montrant comment vérifier la transaction utilisateur.

Création de trois commandes utilisateur supplémentaires sur Kafka

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Capture d’écran montrant comment vérifier les données de table Kafka.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

Capture d’écran montrant comment vérifier la table des commandes.

L'enregistrement de product_id = 104 contrôle est ajouté dans la table de commande sur MySQL sur Azure Cloud Shell

Capture d’écran montrant les enregistrements ajoutés à la table de commandes.

Référence