Dela via


Använda Hive Catalog med Apache Flink® i HDInsight på AKS

Kommentar

Vi drar tillbaka Azure HDInsight på AKS den 31 januari 2025. Före den 31 januari 2025 måste du migrera dina arbetsbelastningar till Microsoft Fabric eller en motsvarande Azure-produkt för att undvika plötsliga uppsägningar av dina arbetsbelastningar. Återstående kluster i din prenumeration stoppas och tas bort från värden.

Endast grundläggande stöd kommer att vara tillgängligt fram till datumet för pensionering.

Viktigt!

Den här funktionen finns i förhandsgranskning. De kompletterande användningsvillkoren för Förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller på annat sätt ännu inte har släppts i allmän tillgänglighet. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS-förhandsversionsinformation. Om du vill ha frågor eller funktionsförslag skickar du en begäran på AskHDInsight med informationen och följer oss för fler uppdateringar i Azure HDInsight Community.

I det här exemplet används Hive-metaarkivet som en beständig katalog med Apache Flinks Hive-katalog. Vi använder den här funktionen för att lagra Kafka-tabellen och MySQL-tabellmetadata på Flink mellan sessioner. Flink använder Kafka-tabellen som är registrerad i Hive-katalogen som källa, utför ett uppslags- och mottagarresultat till MySQL-databasen

Förutsättningar

Flink erbjuder en dubbel integrering med Hive.

  • Det första steget är att använda Hive Metastore (HMS) som en beständig katalog med Flinks HiveCatalog för att lagra Flink-specifika metadata mellan sessioner.
    • Användare kan till exempel lagra sina Kafka- eller ElasticSearch-tabeller i Hive Metastore med hjälp av HiveCatalog och återanvända dem senare i SQL-frågor.
  • Det andra är att erbjuda Flink som en alternativ motor för att läsa och skriva Hive-tabeller.
  • HiveCatalog är utformad för att vara "out of the box" kompatibel med befintliga Hive-installationer. Du behöver inte ändra ditt befintliga Hive-metaarkiv eller ändra dataplaceringen eller partitioneringen av dina tabeller.

Mer information finns i Apache Hive

Förberedelse av miljö

Låt oss skapa ett Apache Flink-kluster med HMS på Azure Portal. Du kan läsa de detaljerade anvisningarna om hur du skapar Flink-kluster.

Skärmbild som visar hur du skapar Flink-kluster.

När klustret har skapats kontrollerar du att HMS körs eller inte på AKS-sidan.

Skärmbild som visar hur du kontrollerar HMS-status i Flink-klustret.

Förbereda användarordertransaktionsdata Kafka-ämne i HDInsight

Ladda ned kafka-klientburken med följande kommando:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

Ta bort tjärfilen med

tar -xvf kafka_2.12-3.2.0.tgz

Skapa meddelandena till Kafka-ämnet.

Skärmbild som visar hur du skapar meddelanden till Kafka-ämnet.

Andra kommandon:

Kommentar

Du måste ersätta bootstrap-server med ditt eget värdnamn eller IP-adress för kafka-koordinatorer

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Förbereda huvuddata för användarorder på MySQL i Azure

Testdatabas:

Skärmbild som visar hur du testar databasen i Kafka.

Skärmbild som visar hur du kör Cloud Shell på portalen.

Förbered ordertabellen:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

Med SSH-nedladdning krävs Kafka-anslutningsprogram och MySQL Database-jars

Kommentar

Ladda ned rätt version jar enligt vår HDInsight kafka-version och MySQL-version.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

Flytta planner-jar

Flytta jar flink-table-planner_2.12-1.17.0-... jar som finns i webssh-poddens /opt to /lib och flytta ut jar flink-table-planner-loader1.17.0-... jar /opt/flink-webssh/opt/ från /lib. Mer information finns i problemet. Utför följande steg för att flytta planner-jar-filen.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

Kommentar

Du behöver bara flytta en extra planner-jar när du använder Hive-dialekten eller HiveServer2-slutpunkten. Detta är dock den rekommenderade konfigurationen för Hive-integrering.

Validering

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

Kommentar

Eftersom vi redan använder Flink-kluster med Hive Metastore behöver du inte utföra några ytterligare konfigurationer.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Skärmbild som visar hur du skapar Kafka-tabellen.

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

Skärmbild som visar hur du skapar mysql-tabellen.

Skärmbild som visar tabellutdata.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

Skärmbild som visar hur du sänker användartransaktionen.

Skärmbild som visar användargränssnittet för Flink.

Kontrollera om användartransaktionsorderdata på Kafka läggs till i huvudtabellordning i MySQL i Azure Cloud Shell

Skärmbild som visar hur du kontrollerar användartransaktionen.

Skapa ytterligare tre användarbeställningar på Kafka

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Skärmbild som visar hur du kontrollerar Kafka-tabelldata.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

Skärmbild som visar hur du kontrollerar ordertabellen.

Kontrollera product_id = 104 att posten har lagts till i ordningstabellen på MySQL i Azure Cloud Shell

Skärmbild som visar de poster som lagts till i ordertabellen.

Referens

  • Apache Hive
  • Apache, Apache Hive, Hive, Apache Flink, Flink och associerade öppen källkod projektnamn är varumärken som tillhör Apache Software Foundation (ASF).