So verwenden Sie den Hive-Katalog mit Apache Flink® auf HDInsight auf AKS
Wichtig
Azure HDInsight auf AKS wurde am 31. Januar 2025 eingestellt. Erfahren Sie mehr in dieser Ankündigung.
Sie müssen Ihre Workloads zu Microsoft Fabric oder ein gleichwertiges Azure-Produkt migrieren, um eine abrupte Beendigung Ihrer Workloads zu vermeiden.
Wichtig
Dieses Feature befindet sich derzeit in der Vorschau. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure Previews weitere rechtliche Bestimmungen enthalten, die für Azure-Features gelten, die in der Betaversion, in der Vorschau oder auf andere Weise noch nicht in die allgemeine Verfügbarkeit veröffentlicht werden. Informationen zu dieser spezifischen Vorschau finden Sie unter Azure HDInsight auf AKS-Vorschauinformationen. Für Fragen oder Vorschläge zu Funktionen senden Sie bitte eine Anfrage bei AskHDInsight mit den Details und folgen Sie uns, um weitere Updates zur Azure HDInsight Communityzu erhalten.
In diesem Beispiel wird der Metastore von Hive als beständiger Katalog mit dem Hive-Katalog von Apache Flink verwendet. Wir verwenden diese Funktionalität zum Speichern von Kafka-Tabellen- und MySQL-Tabellenmetadaten in Flink über Sitzungen hinweg. Flink verwendet die im Hive-Katalog als Quelle registrierte Kafka-Tabelle, führt einige Nachschlagevorgänge durch und leitet das Ergebnis an die MySQL-Datenbank weiter.
Voraussetzungen
- Apache Flink Cluster auf HDInsight auf AKS mit Hive Metastore 3.1.2
-
Apache Kafka Cluster auf HDInsight
- Es ist erforderlich sicherzustellen, dass die Netzwerkeinstellungen wie unter Verwenden von Kafkabeschrieben vollständig sind; dies ist notwendig, um sicherzustellen, dass sich HDInsight auf AKS und HDInsight-Clustern im selben VNet befinden.
- MySQL 8.0.33
Apache Hive auf Apache Flink
Flink bietet eine zweifache Integration mit Hive.
- Der erste Schritt besteht darin, hive Metastore (HMS) als beständigen Katalog mit dem HiveCatalog von Flink zum Speichern bestimmter Flink-Metadaten in Sitzungen zu verwenden.
- Beispielsweise können Benutzer ihre Kafka- oder ElasticSearch-Tabellen mithilfe von HiveCatalog in Hive Metastore speichern und später in SQL-Abfragen wiederverwenden.
- Der zweite besteht darin, Flink als alternatives Modul zum Lesen und Schreiben von Hive-Tabellen anzubieten.
- Der HiveCatalog ist so konzipiert, dass er mit vorhandenen Hive-Installationen kompatibel ist. Sie müssen ihren vorhandenen Hive-Metaspeicher nicht ändern oder die Datenplatzierung oder Partitionierung Ihrer Tabellen ändern.
Weitere Informationen finden Sie unter Apache Hive
Vorbereitung der Umgebung
Erstellen eines Apache Flink-Clusters mit HMS
Lassen Sie uns ein Apache Flink-Cluster mit HMS im Azure-Portal erstellen, detaillierte Anweisungen dazu finden Sie unter Flink-Cluster-Erstellung.
Überprüfen Sie nach der Clustererstellung, ob HMS auf der AKS-Seite ausgeführt wird oder nicht.
Vorbereiten des Kafka-Topics "Benutzerbestellungstransaktionsdaten" auf HDInsight
Laden Sie den kafka-Client jar mit dem folgenden Befehl herunter:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
Entpacken der Tar-Datei mit
tar -xvf kafka_2.12-3.2.0.tgz
Produzieren Sie die Nachrichten zum Thema Kafka.
Andere Befehle:
Anmerkung
Sie müssen bootstrap-server durch Ihren eigenen Kafka-Broker-Hostnamen oder Ihre IP ersetzen.
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Vorbereitung der Stammdaten für Benutzerbestellungen in MySQL auf Azure
DB testen:
Vorbereiten der Bestelltabelle:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
Verwenden Sie SSH, um die erforderlichen Kafka-Connector- und MySQL-Datenbank-Jars herunterzuladen.
Anmerkung
Laden Sie den richtigen Versions-JAR entsprechend unserer HDInsight-Kafka-Version und MySQL-Version herunter.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Verschieben der Planner-JAR-Datei
Verschieben Sie die Jar-Datei flink-table-planner_2.12-1.17.0-....jar, die sich im Verzeichnis /opt des webssh-Pods befindet, nach /lib und verschieben Sie die Jar-Datei flink-table-planner-loader1.17.0-....jar von /lib nach /opt/flink-webssh/opt/. Weitere Informationen finden Sie unter Problem. Führen Sie die folgenden Schritte aus, um den Planner-Jar zu verschieben.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Anmerkung
Ein zusätzliches Verschieben des Planer-Jars wird nur benötigt, wenn der Hive-Dialekt oder der HiveServer2-Endpunkt verwendet wird. Dies ist jedoch die empfohlene Einrichtung für die Hive-Integration.
Validierung
Verwenden von bin/sql-client.sh zum Herstellen einer Verbindung mit Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Erstellen eines Hive-Katalogs und Verbinden mit dem Hive-Katalog in Flink SQL
Anmerkung
Da wir bereits Flink-Cluster mit Hive Metastore verwenden, müssen keine zusätzlichen Konfigurationen ausgeführt werden.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Kafka-Tabelle auf Apache Flink SQL erstellen
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
Erstellen einer MySQL-Tabelle auf Apache Flink SQL
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Überprüfen von Tabellen, die im obigen Hive-Katalog in Flink SQL registriert sind
Speichern von Benutzertransaktionsbestellinformationen in der Master-Bestellungstabelle in der MySQL-Datenbank über Flink SQL.
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Überprüfen, ob die Benutzertransaktionsdaten auf Kafka der Mastertabelle in MySQL in der Azure Cloud Shell hinzugefügt wurden.
Erstellen von drei weiteren Benutzerbestellungen auf Kafka
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Kafka-Tabellendaten in Flink SQL überprüfen
Flink SQL> select * from kafka_user_orders;
Einfügen von product_id=104
in die Tabelle "Bestellungen" in MySQL unter Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
Überprüfen Sie, ob der product_id = 104
-Datensatz in der Bestelltabelle in MySQL in Azure Cloud Shell hinzugefügt wird.
Referenz
- Apache Hive
- Apache, Apache Hive, Hive, Apache Flink, Flink und zugehörige Open Source-Projektnamen sind Marken der Apache Software Foundation (ASF).