Hive Catalog gebruiken met Apache Flink® in HDInsight in AKS
Notitie
Op 31 januari 2025 wordt Azure HDInsight buiten gebruik gesteld op AKS. Vóór 31 januari 2025 moet u uw workloads migreren naar Microsoft Fabric of een gelijkwaardig Azure-product om te voorkomen dat uw workloads plotseling worden beëindigd. De resterende clusters in uw abonnement worden gestopt en verwijderd van de host.
Alleen basisondersteuning is beschikbaar tot de buitengebruikstellingsdatum.
Belangrijk
Deze functie is momenteel beschikbaar in preview. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews bevatten meer juridische voorwaarden die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet beschikbaar zijn in algemene beschikbaarheid. Zie Azure HDInsight op AKS Preview-informatie voor meer informatie over deze specifieke preview. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight met de details en volgt u ons voor meer updates in de Azure HDInsight-community.
In dit voorbeeld wordt de Metastore van Hive gebruikt als een permanente catalogus met de Hive-catalogus van Apache Flink. We gebruiken deze functionaliteit voor het opslaan van kafka-tabel- en MySQL-tabelmetagegevens op Flink in sessies. Flink maakt gebruik van kafka-tabel die is geregistreerd in Hive Catalog als bron, wat opzoek- en sinkresultaten uitvoeren in mySQL-database
Vereisten
- Apache Flink Cluster in HDInsight op AKS met Hive Metastore 3.1.2
- Apache Kafka-cluster in HDInsight
- U moet ervoor zorgen dat de netwerkinstellingen zijn voltooid zoals beschreven in Kafka. Dat is om ervoor te zorgen dat HDInsight op AKS- en HDInsight-clusters zich in hetzelfde VNet bevinden
- MySQL 8.0.33
Apache Hive op Apache Flink
Flink biedt een tweevoudige integratie met Hive.
- De eerste stap is het gebruik van Hive Metastore (HMS) als permanente catalogus met HiveCatalog van Flink voor het opslaan van specifieke metagegevens in sessies.
- Gebruikers kunnen bijvoorbeeld hun Kafka- of ElasticSearch-tabellen opslaan in Hive Metastore met behulp van HiveCatalog en ze later opnieuw gebruiken in SQL-query's.
- De tweede is het aanbieden van Flink als alternatieve engine voor het lezen en schrijven van Hive-tabellen.
- De HiveCatalog is ontworpen om 'out-of-the-box' compatibel te zijn met bestaande Hive-installaties. U hoeft uw bestaande Hive-metastore niet te wijzigen of de plaatsing of partitionering van uw tabellen te wijzigen.
Zie Apache Hive voor meer informatie
Omgevingsvoorbereiding
Een Apache Flink-cluster maken met HMS
Hiermee kunt u een Apache Flink-cluster maken met HMS in Azure Portal. U kunt de gedetailleerde instructies voor het maken van een Flink-cluster raadplegen.
Controleer na het maken van het cluster of HMS wordt uitgevoerd of niet aan de AKS-zijde.
Kafka-onderwerp over transactiegegevens van gebruikersorders voorbereiden in HDInsight
Download het kafka-client-JAR met behulp van de volgende opdracht:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
Het tar-bestand opheffen met
tar -xvf kafka_2.12-3.2.0.tgz
De berichten naar het Kafka-onderwerp produceren.
Andere opdrachten:
Notitie
U moet bootstrap-server vervangen door uw eigen kafka-brokershostnaam of IP-adres
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Hoofdgegevens van gebruikersorders voorbereiden op MySQL in Azure
Database testen:
Bereid de ordertabel voor:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
Vereiste Kafka-connector en MySQL Database-JAR's voor SSH downloaden
Notitie
Download de juiste versie-JAR op basis van onze HDInsight kafka-versie en MySQL-versie.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
De planner-jar verplaatsen
Verplaats het jar flink-table-planner_2.12-1.17.0-... jar bevindt zich in webssh pod's /opt to /lib en verplaats de jar flink-table-planner-loader1.17.0-... jar /opt/flink-webssh/opt/ van /lib. Raadpleeg het probleem voor meer informatie. Voer de volgende stappen uit om de planner-JAR te verplaatsen.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Notitie
Een extra planner jar-verplaatsing is alleen nodig bij het gebruik van hive dialect of HiveServer2-eindpunt. Dit is echter de aanbevolen installatie voor Hive-integratie.
Validatie
Bin/sql-client.sh gebruiken om verbinding te maken met Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Hive-catalogus maken en verbinding maken met de Hive-catalogus op Flink SQL
Notitie
Omdat we al flink cluster gebruiken met Hive Metastore, hoeft u geen extra configuraties uit te voeren.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Kafka-tabel maken in Apache Flink SQL
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
MySQL-tabel maken in Apache Flink SQL
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Controleer de tabellen die zijn geregistreerd in de bovenstaande Hive-catalogus op Flink SQL
Informatie over transactieorders van sinkgebruikers in hoofdordertabel in MySQL op Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Controleer of gegevens van transactieorders voor gebruikers in Kafka zijn toegevoegd in de hoofdtabelvolgorde in MySQL in Azure Cloud Shell
Nog drie gebruikersorders maken in Kafka
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Kafka-tabelgegevens controleren op Flink SQL
Flink SQL> select * from kafka_user_orders;
Invoegen product_id=104
in de tabel Orders in MySQL op Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
Controleer of product_id = 104
de record is toegevoegd in de volgordetabel in MySQL in Azure Cloud Shell
Verwijzing
- Apache Hive
- Apache, Apache Hive, Hive, Apache Flink, Flink en bijbehorende opensource-projectnamen zijn handelsmerken van de Apache Software Foundation (ASF).