Sdílet prostřednictvím


Jak používat katalog Hive s Apache Flink® ve službě HDInsight v AKS

Důležitý

Azure HDInsight v AKS byl vyřazen 31. ledna 2025. Více se dozvíte v tomto oznámení .

Abyste se vyhnuli náhlému ukončení úloh, musíte migrovat úlohy do Microsoft Fabric nebo ekvivalentního produktu Azure.

Důležitý

Tato funkce je aktuálně ve verzi Preview. Doplňkové podmínky použití pro Microsoft Azure Previews obsahují další právní podmínky, které se vztahují na funkce Azure, jež jsou v beta verzi, ve verzi Preview nebo ještě nebyly obecně dostupné. Informace o této konkrétní verzi Preview najdete v tématu Azure HDInsight ve službě AKS ve verzi Preview. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost s podrobnostmi na AskHDInsight a sledujte nás pro více aktualizací na komunitě Azure HDInsight.

V tomto příkladu se metastore Hive používá jako trvalý katalog v rámci katalogu Hive Apache Flink. Tuto funkci používáme k ukládání tabulek Kafka a metadat tabulek MySQL na Flink napříč relacemi. Flink používá tabulku Kafka zaregistrovanou v katalogu Hive jako zdroj, provádí vyhledávání a ukládá výsledky do databáze MySQL.

Požadavky

Flink nabízí dvounásobnou integraci s Hivem.

  • Prvním krokem je použití metastoru Hive (HMS) jako trvalého katalogu s HiveCatalogem Flinku pro ukládání metadat specifických pro Flink napříč relacemi.
    • Uživatelé můžou například ukládat tabulky Kafka nebo ElasticSearch do metastoru Hive pomocí HiveCatalogu a později je znovu použít v dotazech SQL.
  • Druhou možností je nabídnout Flink jako alternativní modul pro čtení a zápis tabulek Hive.
  • HiveCatalog je navržený tak, aby byl "mimo krabici" kompatibilní s existujícími instalacemi Hive. Stávající metastore Hive nemusíte upravovat ani měnit umístění dat nebo dělení tabulek.

Další informace najdete v tématu Apache Hive

Příprava prostředí

Vytvořme cluster Apache Flink s HMS na Azure portálu, podrobné pokyny k vytvoření clusteru Flink najdete v části .

snímek obrazovky znázorňující, jak vytvořit cluster Flink

Po vytvoření clusteru zkontrolujte, jestli je HMS spuštěný nebo není na straně AKS.

snímek obrazovky znázorňující, jak zkontrolovat stav HMS v clusteru Flink

Příprava tématu Kafka o transakcích objednávek uživatelů ve službě HDInsight

Pomocí následujícího příkazu stáhněte soubor JAR klienta Kafka:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

Rozbalit tar soubor

tar -xvf kafka_2.12-3.2.0.tgz

Vygenerujte zprávy do tématu Kafka.

Snímek obrazovky znázorňující, jak vytvářet zprávy do tématu Kafka

Další příkazy:

Poznámka

Musíte nahradit bootstrap-server vlastním názvem hostitele nebo IP adresou zprostředkovatele Kafka.

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Příprava hlavních dat objednávek uživatelů v MySQL v Azure

Testování databáze:

Snímek obrazovky ukazující, jak otestovat databázi v systému Kafka

snímek obrazovky znázorňující, jak spustit Cloud Shell na portálu

Připravit tabulku objednávek:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

Stažení požadovaného konektoru Kafka s využitím SSH a souborů JAR služby MySQL Database

Poznámka

Stáhněte si soubor JAR ve správné verzi podle naší verze HDInsight Kafka a verze MySQL.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

Přesunutí plánovacího souboru

Přesuňte soubor jar flink-table-planner_2.12-1.17.0-....jar umístěný v podu webssh v /opt do /lib a soubor jar flink-table-planner-loader1.17.0-....jar z /lib do /opt/flink-webssh/opt/. Další podrobnosti najdete v problému . Proveďte následující kroky k přesunutí JAR plánovače.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

Poznámka

Přesunutí dalšího planner jar je potřeba pouze při použití dialektu Hive nebo rozhraní HiveServer2. Toto je ale doporučené nastavení pro integraci Hive.

Validace

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

Poznámka

Protože už používáme cluster Flink s metastorem Hive, není potřeba provádět žádné další konfigurace.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Snímek obrazovky znázorňující, jak vytvořit tabulku Kafka

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

Snímek obrazovky znázorňující, jak vytvořit tabulku mysql

Snímek obrazovky s výstupem tabulky

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

Snímek obrazovky znázorňující způsob blokování transakce uživatele

snímek obrazovky s uživatelským rozhraním Flink

Zkontrolujte, zda jsou data objednávek transakcí uživatelů v systému Kafka přidána do hlavní tabulky v MySQL na prostředí Azure Cloud Shell.

Snímek obrazovky znázorňující, jak zkontrolovat transakci uživatele

Vytvoření tří dalších uživatelských objednávek v systému Kafka

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Snímek obrazovky znázorňující, jak zkontrolovat data tabulky Kafka

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

Snímek obrazovky znázorňující, jak zkontrolovat tabulku objednávek

Kontrola přidání záznamu product_id = 104 v tabulce objednávek v MySQL ve službě Azure Cloud Shell

Snímek obrazovky zobrazující záznamy přidané do tabulky objednávek

Odkaz

  • Apache Hive
  • Názvy Apache, Apache Hive, Hive, Apache Flink, Flink a přidružených open-source projektů jsou ochranné známky Apache Software Foundation (ASF).