How to use Hive Catalog with Apache Flink on HDInsight on AKS (Jak używać katalogu Hive z usługą Apache Flink® w usłudze HDInsight w usłudze AKS)
Uwaga
Wycofamy usługę Azure HDInsight w usłudze AKS 31 stycznia 2025 r. Przed 31 stycznia 2025 r. należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure, aby uniknąć nagłego zakończenia obciążeń. Pozostałe klastry w ramach subskrypcji zostaną zatrzymane i usunięte z hosta.
Tylko podstawowa pomoc techniczna będzie dostępna do daty wycofania.
Ważne
Ta funkcja jest aktualnie dostępna jako funkcja podglądu. Dodatkowe warunki użytkowania dla wersji zapoznawczych platformy Microsoft Azure obejmują więcej warunków prawnych, które dotyczą funkcji platformy Azure, które znajdują się w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej, zobacz Informacje o wersji zapoznawczej usługi Azure HDInsight w usłudze AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie w usłudze AskHDInsight , aby uzyskać szczegółowe informacje i postępuj zgodnie z nami, aby uzyskać więcej aktualizacji w społeczności usługi Azure HDInsight.
W tym przykładzie magazyn metadanych hive jest używany jako katalog trwały z katalogiem Hive platformy Apache Flink. Używamy tej funkcji do przechowywania metadanych tabeli platformy Kafka i tabeli MySQL w języku Flink między sesjami. Funkcja Flink używa tabeli platformy Kafka zarejestrowanej w katalogu Hive jako źródła, wykonaj pewne wyniki wyszukiwania i ujścia do bazy danych MySQL
Wymagania wstępne
- Klaster Apache Flink w usłudze HDInsight w usłudze AKS z magazynem metadanych Hive 3.1.2
- Klaster Apache Kafka w usłudze HDInsight
- Musisz upewnić się, że ustawienia sieciowe zostały ukończone zgodnie z opisem w temacie Korzystanie z platformy Kafka. Upewnij się, że usługa HDInsight w klastrach AKS i HDInsight znajduje się w tej samej sieci wirtualnej
- MySQL 8.0.33
Apache Hive na platformie Apache Flink
Funkcja Flink oferuje 2-krotną integrację z programem Hive.
- Pierwszym krokiem jest użycie magazynu metadanych Hive (HMS) jako wykazu trwałego z usługą HiveCatalog Flink do przechowywania metadanych specyficznych dla języka Flink między sesjami.
- Na przykład użytkownicy mogą przechowywać tabele Kafka lub ElasticSearch w magazynie metadanych Hive przy użyciu programu HiveCatalog i ponownie używać ich później w zapytaniach SQL.
- Drugim jest oferowanie Flink jako alternatywnego aparatu do odczytywania i pisania tabel Hive.
- Serwer HiveCatalog został zaprojektowany tak, aby był "poza pudełkiem" zgodny z istniejącymi instalacjami programu Hive. Nie musisz modyfikować istniejącego magazynu metadanych Hive ani zmieniać umieszczania danych ani partycjonowania tabel.
Aby uzyskać więcej informacji, zobacz Apache Hive
Przygotowanie środowiska
Tworzenie klastra Apache Flink za pomocą hmS
Umożliwia utworzenie klastra Apache Flink za pomocą systemu HMS w witrynie Azure Portal. Szczegółowe instrukcje dotyczące tworzenia klastra Flink można znaleźć w artykule .
Po utworzeniu klastra sprawdź, czy HMS jest uruchomiony, czy nie po stronie usługi AKS.
Przygotowywanie tematu danych transakcji zamówienia użytkownika platformy Kafka w usłudze HDInsight
Pobierz plik jar klienta platformy Kafka przy użyciu następującego polecenia:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
Usuń plik tar z
tar -xvf kafka_2.12-3.2.0.tgz
Wygeneruj komunikaty do tematu platformy Kafka.
Inne polecenia:
Uwaga
Musisz zastąpić bootstrap-server własną nazwą hosta lub adresem IP brokerów platformy Kafka
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Przygotowywanie danych głównych zamówień użytkownika w usłudze MySQL na platformie Azure
Testowanie bazy danych:
Przygotuj tabelę zamówień:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
Korzystanie z wymaganego łącznika platformy Kafka przy użyciu protokołu SSH i plików JAR bazy danych MySQL
Uwaga
Pobierz prawidłowy plik jar wersji zgodnie z naszą wersją platformy Kafka usługi HDInsight i wersją programu MySQL.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Przenoszenie pliku jar planisty
Przenieś plik jar flink-table-planner_2.12-1.17.0-.... plik jar znajdujący się w zasobnikach webssh /opt do /lib i wyjdź plik jar flink-table-planner-loader1.17.0-..... jar /opt/flink-webssh/opt/ from /lib. Aby uzyskać więcej informacji, zobacz problem . Wykonaj następujące kroki, aby przenieść plik jar planisty.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Uwaga
Dodatkowy plik jar planisty jest potrzebny tylko w przypadku korzystania z dialektu Hive lub punktu końcowego HiveServer2. Jest to jednak zalecana konfiguracja integracji programu Hive.
Walidacja
Nawiązywanie połączenia z bazą danych SQL za pomocą bin/sql-client.sh
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Tworzenie wykazu programu Hive i nawiązywanie połączenia z wykazem hive w języku Flink SQL
Uwaga
Ponieważ używamy już klastra Flink z magazynem metadanych Hive, nie ma potrzeby wykonywania żadnych dodatkowych konfiguracji.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Tworzenie tabeli platformy Kafka w języku Apache Flink SQL
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
Tworzenie tabeli MySQL w języku Apache Flink SQL
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Sprawdzanie tabel zarejestrowanych w powyższym wykazie programu Hive w języku Flink SQL
Ujście informacji o kolejności transakcji użytkownika do tabeli zamówień głównych w programie MySQL w języku Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Sprawdź, czy dane zamówienia transakcji użytkownika na platformie Kafka są dodawane w kolejności tabeli głównej w usłudze MySQL w usłudze Azure Cloud Shell
Tworzenie trzech kolejnych zamówień użytkowników na platformie Kafka
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Sprawdzanie danych tabeli platformy Kafka w języku Flink SQL
Flink SQL> select * from kafka_user_orders;
Wstawianie product_id=104
do tabeli zamówień w usłudze MySQL w języku Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
Sprawdzanie product_id = 104
rekordu jest dodawane w tabeli zamówień w usłudze MySQL w usłudze Azure Cloud Shell
Odwołanie
- Apache Hive
- Apache, Apache Hive, Hive, Apache Flink, Flink i skojarzone nazwy projektów typu open source są znakami towarowymi platformy Apache Software Foundation (ASF).