Hive Catalog gebruiken met Apache Flink® in HDInsight in AKS
Belangrijk
Azure HDInsight op AKS is op 31 januari 2025 buiten gebruik gesteld. Leer meer in deze aankondiging.
U moet uw workloads migreren naar Microsoft Fabric- of een gelijkwaardig Azure-product om plotselinge beëindiging van uw workloads te voorkomen.
Belangrijk
Deze functie is momenteel beschikbaar als preview-versie. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews meer juridische voorwaarden bevatten die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet in algemene beschikbaarheid zijn vrijgegeven. Zie Azure HDInsight in AKS preview-informatievoor meer informatie over deze specifieke preview. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight- met de details en volgt u ons voor meer updates over Azure HDInsight Community-.
In dit voorbeeld wordt de Metastore van Hive gebruikt als een permanente catalogus met de Hive-catalogus van Apache Flink. We gebruiken deze functionaliteit voor het opslaan van kafka-tabel- en MySQL-tabelmetagegevens op Flink in sessies. Flink maakt gebruik van een Kafka-tabel die is geregistreerd in de Hive Catalog als bron, voert enkele opzoekacties uit en voert het resultaat uit naar een MySQL-database.
Voorwaarden
- Apache Flink Cluster in HDInsight op AKS met Hive Metastore 3.1.2
-
Apache Kafka-cluster in HDInsight-
- U moet ervoor zorgen dat de netwerkinstellingen zijn voltooid zoals beschreven in Kafka-; dat is om ervoor te zorgen dat HDInsight op AKS- en HDInsight-clusters zich in hetzelfde VNet bevinden
- MySQL 8.0.33
Apache Hive op Apache Flink
Flink biedt een tweevoudige integratie met Hive.
- De eerste stap is het gebruik van Hive Metastore (HMS) als permanente catalogus met HiveCatalog van Flink voor het opslaan van specifieke metagegevens in sessies.
- Gebruikers kunnen bijvoorbeeld hun Kafka- of ElasticSearch-tabellen opslaan in Hive Metastore met behulp van HiveCatalog en ze later opnieuw gebruiken in SQL-query's.
- De tweede is het aanbieden van Flink als alternatieve engine voor het lezen en schrijven van Hive-tabellen.
- De HiveCatalog is ontworpen om 'out-of-the-box' compatibel te zijn met bestaande Hive-installaties. U hoeft uw bestaande Hive-metastore niet te wijzigen of de plaatsing of partitionering van uw tabellen te wijzigen.
Zie Apache Hive- voor meer informatie
Omgevingsvoorbereiding
Een Apache Flink-cluster maken met HMS
Laten we een Apache Flink-cluster maken met HMS in Azure Portal, u kunt de gedetailleerde instructies voor het maken van een Flink-cluster raadplegen op .
Controleer na het maken van het cluster of HMS wordt uitgevoerd of niet aan de AKS-zijde.
Kafka-onderwerp over transactiegegevens van gebruikersorders voorbereiden in HDInsight
Download het kafka-client-JAR met behulp van de volgende opdracht:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
Pak het tar-bestand uit met
tar -xvf kafka_2.12-3.2.0.tgz
De berichten naar het Kafka-onderwerp produceren.
Andere opdrachten:
Notitie
U moet bootstrap-server vervangen door uw eigen kafka-brokershostnaam of IP-adres
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Hoofdgegevens van gebruikersorders voorbereiden op MySQL in Azure
Database testen:
De ordertabel voorbereiden:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
Gebruik SSH om vereiste Kafka-connector en MySQL-database-jars te downloaden.
Notitie
Download de juiste versie-JAR op basis van onze HDInsight kafka-versie en MySQL-versie.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
De planner-jar verplaatsen
Verplaats het jar-bestand flink-table-planner_2.12-1.17.0-....jar, gelegen in de /opt-directory van de webssh pod, naar /lib en verplaats het jar-bestand flink-table-planner-loader1.17.0-....jar vanuit /lib naar /opt/flink-webssh/opt/. Raadpleeg probleem voor meer informatie. Voer de volgende stappen uit om de planner-JAR te verplaatsen.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Notitie
Een extra verplaatsing van de planner-jar is alleen nodig bij gebruik van het Hive-dialect of de HiveServer2-eindpunt. Dit is echter de aanbevolen installatie voor Hive-integratie.
Validering
Bin/sql-client.sh gebruiken om verbinding te maken met Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Hive-catalogus maken en verbinding maken met de Hive-catalogus op Flink SQL
Notitie
Aangezien we al een Flink-cluster gebruiken met Hive Metastore, hoeven er geen extra configuraties te worden uitgevoerd.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Kafka-tabel maken in Apache Flink SQL
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
MySQL-tabel maken in Apache Flink SQL
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Controleer de tabellen die zijn geregistreerd in de bovenstaande Hive-catalogus op Flink SQL
Schrijf transactieorderinformatie van gebruikers in de hoofdbestellingentabel in MySQL met Flink SQL.
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Controleer of gegevens van transactieorders voor gebruikers in Kafka zijn toegevoegd in de hoofdtabelvolgorde in MySQL in Azure Cloud Shell
Nog drie gebruikersorders maken in Kafka
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Kafka-tabelgegevens controleren op Flink SQL
Flink SQL> select * from kafka_user_orders;
product_id=104
invoegen in de tabel Orders in MySQL op Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
Controleer of de product_id = 104
-record is toegevoegd in de bestellingentabel in MySQL in Azure Cloud Shell.
Referentie
- Apache Hive-
- Apache, Apache Hive, Hive, Apache Flink, Flink en bijbehorende opensource-projectnamen zijn handelsmerken van de Apache Software Foundation (ASF).