Använda Hive Catalog med Apache Flink® i HDInsight på AKS
Kommentar
Vi drar tillbaka Azure HDInsight på AKS den 31 januari 2025. Före den 31 januari 2025 måste du migrera dina arbetsbelastningar till Microsoft Fabric eller en motsvarande Azure-produkt för att undvika plötsliga uppsägningar av dina arbetsbelastningar. Återstående kluster i din prenumeration stoppas och tas bort från värden.
Endast grundläggande stöd kommer att vara tillgängligt fram till datumet för pensionering.
Viktigt!
Den här funktionen finns i förhandsgranskning. De kompletterande användningsvillkoren för Förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller på annat sätt ännu inte har släppts i allmän tillgänglighet. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS-förhandsversionsinformation. Om du vill ha frågor eller funktionsförslag skickar du en begäran på AskHDInsight med informationen och följer oss för fler uppdateringar i Azure HDInsight Community.
I det här exemplet används Hive-metaarkivet som en beständig katalog med Apache Flinks Hive-katalog. Vi använder den här funktionen för att lagra Kafka-tabellen och MySQL-tabellmetadata på Flink mellan sessioner. Flink använder Kafka-tabellen som är registrerad i Hive-katalogen som källa, utför ett uppslags- och mottagarresultat till MySQL-databasen
Förutsättningar
- Apache Flink-kluster i HDInsight på AKS med Hive Metastore 3.1.2
- Apache Kafka-kluster på HDInsight
- Du måste se till att nätverksinställningarna är fullständiga enligt beskrivningen i Använda Kafka. Det är för att se till att HDInsight på AKS- och HDInsight-kluster finns i samma virtuella nätverk
- MySQL 8.0.33
Apache Hive på Apache Flink
Flink erbjuder en dubbel integrering med Hive.
- Det första steget är att använda Hive Metastore (HMS) som en beständig katalog med Flinks HiveCatalog för att lagra Flink-specifika metadata mellan sessioner.
- Användare kan till exempel lagra sina Kafka- eller ElasticSearch-tabeller i Hive Metastore med hjälp av HiveCatalog och återanvända dem senare i SQL-frågor.
- Det andra är att erbjuda Flink som en alternativ motor för att läsa och skriva Hive-tabeller.
- HiveCatalog är utformad för att vara "out of the box" kompatibel med befintliga Hive-installationer. Du behöver inte ändra ditt befintliga Hive-metaarkiv eller ändra dataplaceringen eller partitioneringen av dina tabeller.
Mer information finns i Apache Hive
Förberedelse av miljö
Skapa ett Apache Flink-kluster med HMS
Låt oss skapa ett Apache Flink-kluster med HMS på Azure Portal. Du kan läsa de detaljerade anvisningarna om hur du skapar Flink-kluster.
När klustret har skapats kontrollerar du att HMS körs eller inte på AKS-sidan.
Förbereda användarordertransaktionsdata Kafka-ämne i HDInsight
Ladda ned kafka-klientburken med följande kommando:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
Ta bort tjärfilen med
tar -xvf kafka_2.12-3.2.0.tgz
Skapa meddelandena till Kafka-ämnet.
Andra kommandon:
Kommentar
Du måste ersätta bootstrap-server med ditt eget värdnamn eller IP-adress för kafka-koordinatorer
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Förbereda huvuddata för användarorder på MySQL i Azure
Testdatabas:
Förbered ordertabellen:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
Med SSH-nedladdning krävs Kafka-anslutningsprogram och MySQL Database-jars
Kommentar
Ladda ned rätt version jar enligt vår HDInsight kafka-version och MySQL-version.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Flytta planner-jar
Flytta jar flink-table-planner_2.12-1.17.0-... jar som finns i webssh-poddens /opt to /lib och flytta ut jar flink-table-planner-loader1.17.0-... jar /opt/flink-webssh/opt/ från /lib. Mer information finns i problemet. Utför följande steg för att flytta planner-jar-filen.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Kommentar
Du behöver bara flytta en extra planner-jar när du använder Hive-dialekten eller HiveServer2-slutpunkten. Detta är dock den rekommenderade konfigurationen för Hive-integrering.
Validering
Använda bin/sql-client.sh för att ansluta till Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Skapa Hive-katalog och anslut till hive-katalogen på Flink SQL
Kommentar
Eftersom vi redan använder Flink-kluster med Hive Metastore behöver du inte utföra några ytterligare konfigurationer.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Skapa Kafka-tabell i Apache Flink SQL
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
Skapa MySQL-tabell i Apache Flink SQL
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Kontrollera tabeller som är registrerade i Hive-katalogen ovan på Flink SQL
Information om användartransaktionsordning för mottagare i huvudordningstabellen i MySQL på Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Kontrollera om användartransaktionsorderdata på Kafka läggs till i huvudtabellordning i MySQL i Azure Cloud Shell
Skapa ytterligare tre användarbeställningar på Kafka
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Kontrollera Kafka-tabelldata i Flink SQL
Flink SQL> select * from kafka_user_orders;
Infoga product_id=104
i ordertabellen i MySQL på Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
Kontrollera product_id = 104
att posten har lagts till i ordningstabellen på MySQL i Azure Cloud Shell
Referens
- Apache Hive
- Apache, Apache Hive, Hive, Apache Flink, Flink och associerade öppen källkod projektnamn är varumärken som tillhör Apache Software Foundation (ASF).