Jak používat katalog Hive s Apache Flink® ve službě HDInsight v AKS
Důležitý
Azure HDInsight v AKS byl vyřazen 31. ledna 2025. Více se dozvíte v tomto oznámení .
Abyste se vyhnuli náhlému ukončení úloh, musíte migrovat úlohy do Microsoft Fabric nebo ekvivalentního produktu Azure.
Důležitý
Tato funkce je aktuálně ve verzi Preview. Doplňkové podmínky použití pro Microsoft Azure Previews obsahují další právní podmínky, které se vztahují na funkce Azure, jež jsou v beta verzi, ve verzi Preview nebo ještě nebyly obecně dostupné. Informace o této konkrétní verzi Preview najdete v tématu Azure HDInsight ve službě AKS ve verzi Preview. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost s podrobnostmi na AskHDInsight a sledujte nás pro více aktualizací na komunitě Azure HDInsight.
V tomto příkladu se metastore Hive používá jako trvalý katalog v rámci katalogu Hive Apache Flink. Tuto funkci používáme k ukládání tabulek Kafka a metadat tabulek MySQL na Flink napříč relacemi. Flink používá tabulku Kafka zaregistrovanou v katalogu Hive jako zdroj, provádí vyhledávání a ukládá výsledky do databáze MySQL.
Požadavky
- clusteru Apache Flink ve službě HDInsight v AKS s metastorem Hive 3.1.2
-
clusteru Apache Kafka ve službě HDInsight
- Musíte zajistit, aby nastavení sítě bylo kompletní, jak je popsáno v tématu Používání Kafka; to zajistí, že HDInsight na AKS a clustery HDInsight jsou ve stejné virtuální síti.
- MySQL 8.0.33
Apache Hive v Apache Flinku
Flink nabízí dvounásobnou integraci s Hivem.
- Prvním krokem je použití metastoru Hive (HMS) jako trvalého katalogu s HiveCatalogem Flinku pro ukládání metadat specifických pro Flink napříč relacemi.
- Uživatelé můžou například ukládat tabulky Kafka nebo ElasticSearch do metastoru Hive pomocí HiveCatalogu a později je znovu použít v dotazech SQL.
- Druhou možností je nabídnout Flink jako alternativní modul pro čtení a zápis tabulek Hive.
- HiveCatalog je navržený tak, aby byl "mimo krabici" kompatibilní s existujícími instalacemi Hive. Stávající metastore Hive nemusíte upravovat ani měnit umístění dat nebo dělení tabulek.
Další informace najdete v tématu Apache Hive
Příprava prostředí
Vytvoření clusteru Apache Flink pomocí HMS
Vytvořme cluster Apache Flink s HMS na Azure portálu, podrobné pokyny k vytvoření clusteru Flink najdete v části .
Po vytvoření clusteru zkontrolujte, jestli je HMS spuštěný nebo není na straně AKS.
Příprava tématu Kafka o transakcích objednávek uživatelů ve službě HDInsight
Pomocí následujícího příkazu stáhněte soubor JAR klienta Kafka:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
Rozbalit tar soubor
tar -xvf kafka_2.12-3.2.0.tgz
Vygenerujte zprávy do tématu Kafka.
Další příkazy:
Poznámka
Musíte nahradit bootstrap-server vlastním názvem hostitele nebo IP adresou zprostředkovatele Kafka.
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Příprava hlavních dat objednávek uživatelů v MySQL v Azure
Testování databáze:
Připravit tabulku objednávek:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
Stažení požadovaného konektoru Kafka s využitím SSH a souborů JAR služby MySQL Database
Poznámka
Stáhněte si soubor JAR ve správné verzi podle naší verze HDInsight Kafka a verze MySQL.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Přesunutí plánovacího souboru
Přesuňte soubor jar flink-table-planner_2.12-1.17.0-....jar umístěný v podu webssh v /opt do /lib a soubor jar flink-table-planner-loader1.17.0-....jar z /lib do /opt/flink-webssh/opt/. Další podrobnosti najdete v problému . Proveďte následující kroky k přesunutí JAR plánovače.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Poznámka
Přesunutí dalšího planner jar je potřeba pouze při použití dialektu Hive nebo rozhraní HiveServer2. Toto je ale doporučené nastavení pro integraci Hive.
Validace
Použití bin/sql-client.sh pro připojení k Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Vytvoření katalogu Hive a připojení k katalogu Hive v Flink SQL
Poznámka
Protože už používáme cluster Flink s metastorem Hive, není potřeba provádět žádné další konfigurace.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Vytvoření tabulky Kafka v Apache Flink SQL
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
Vytvoření tabulky MySQL v Apache Flink SQL
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Kontrola tabulek registrovaných ve výše uvedeném katalogu Hive v Flink SQL
Uložte informace o objednávkových transakcích uživatele do hlavní tabulky objednávek v MySQL pomocí Flink SQL.
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Zkontrolujte, zda jsou data objednávek transakcí uživatelů v systému Kafka přidána do hlavní tabulky v MySQL na prostředí Azure Cloud Shell.
Vytvoření tří dalších uživatelských objednávek v systému Kafka
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Kontrola dat tabulky Kafka v Flink SQL
Flink SQL> select * from kafka_user_orders;
Vložení product_id=104
do tabulky objednávek v MySQL v Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
Kontrola přidání záznamu product_id = 104
v tabulce objednávek v MySQL ve službě Azure Cloud Shell
Odkaz
- Apache Hive
- Názvy Apache, Apache Hive, Hive, Apache Flink, Flink a přidružených open-source projektů jsou ochranné známky Apache Software Foundation (ASF).