Udostępnij za pośrednictwem


Jak używać katalogu Hive z Apache Flink® w HDInsight na AKS

Ważny

Usługa Azure HDInsight w usłudze AKS została wycofana 31 stycznia 2025 r. Dowiedz się więcej z tym ogłoszeniem.

Aby uniknąć nagłego kończenia obciążeń, należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure.

Ważny

Ta funkcja jest obecnie dostępna w wersji zapoznawczej. Dodatkowe warunki użytkowania dla platformy Microsoft Azure Previews zawierają dodatkowe warunki prawne, które mają zastosowanie do funkcji Azure będących w wersjach beta, zapoznawczych lub innych, które jeszcze nie zostały udostępnione ogółowi. Aby uzyskać informacje na temat tej konkretnej wersji zapoznawczej, zobacz informacje o wersji zapoznawczej Azure HDInsight na AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie dotyczące AskHDInsight, aby uzyskać więcej informacji na temat społeczności usługi Azure HDInsight.

W tym przykładzie Metastore Hive jest używany jako trwały katalog w połączeniu z katalogiem Hive platformy Apache Flink. Używamy tej funkcjonalności do przechowywania metadanych tabeli Kafka i tabeli MySQL na Flink między sesjami. Flink używa tabeli Kafka zarejestrowanej w katalogu Hive jako źródła, przeprowadza pewne wyszukiwania, a następnie przesyła wyniki do bazy danych MySQL.

Warunki wstępne

Flink oferuje dwojaką integrację z Hive.

  • Pierwszym krokiem jest użycie magazynu metadanych Hive (HMS) jako wykazu trwałego z usługą HiveCatalog Flink do przechowywania metadanych specyficznych dla języka Flink między sesjami.
    • Na przykład użytkownicy mogą przechowywać tabele Kafka lub ElasticSearch w magazynie metadanych Hive przy użyciu programu HiveCatalog i ponownie używać ich później w zapytaniach SQL.
  • Drugim jest oferowanie Flink jako alternatywnego aparatu do odczytywania i pisania tabel Hive.
  • Serwer HiveCatalog został zaprojektowany tak, aby był od razu po instalacji zgodny z istniejącymi instalacjami programu Hive. Nie musisz modyfikować istniejącego magazynu metadanych Hive ani zmieniać umieszczania danych ani partycjonowania tabel.

Aby uzyskać więcej informacji, zobacz Apache Hive

Przygotowanie środowiska

Stwórz klaster Apache Flink z HMS w portalu Azure, można zapoznać się ze szczegółowymi instrukcjami dotyczącymi tworzenia klastra Flink .

Zrzut ekranu przedstawiający sposób tworzenia klastra Flink.

Po utworzeniu klastra sprawdź, czy HMS jest uruchomiony po stronie usługi AKS.

Zrzut ekranu przedstawiający sposób sprawdzania stanu HMS w klastrze Flink.

Przygotuj temat danych transakcji zamówień użytkownika w usłudze Kafka na HDInsight

Pobierz plik jar klienta platformy Kafka przy użyciu następującego polecenia:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

Rozpakuj plik tar za pomocą

tar -xvf kafka_2.12-3.2.0.tgz

Wygeneruj komunikaty do tematu platformy Kafka.

Zrzut ekranu przedstawiający sposób tworzenia komunikatów na temat platformy Kafka.

Inne polecenia:

Notatka

Musisz zastąpić „bootstrap-server” własną nazwą hosta lub adresem IP brokerów Kafka.

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Przygotowywanie danych głównych zamówień użytkownika w usłudze MySQL na platformie Azure

Testowanie bazy danych:

Zrzut ekranu przedstawiający sposób testowania bazy danych na platformie Kafka.

Zrzut ekranu przedstawiający sposób uruchamiania usługi Cloud Shell w portalu.

Przygotowywanie tabeli zamówień:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

Używając SSH, pobierz wymagany łącznik Kafka i pliki JAR bazy danych MySQL.

Notatka

Pobierz poprawną wersję pliku JAR zgodnie z naszą wersją usługi Kafka platformy HDInsight i wersją programu MySQL.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

Przenoszenie słoika planera

Przenieś plik jar flink-table-planner_2.12-1.17.0-....jar znajdujący się w podzie webssh /opt do /lib i przenieś plik jar flink-table-planner-loader1.17.0-....jar znajdujący się w /lib do /opt/flink-webssh/opt/. Aby uzyskać więcej informacji, zobacz kwestię . Wykonaj następujące kroki, aby przenieść plik JAR planera.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

Notatka

Przenoszenie dodatkowego archiwum jar planisty jest potrzebne tylko w przypadku używania dialektu Hive lub punktu końcowego HiveServer2. Jest to jednak zalecana konfiguracja integracji programu Hive.

Walidacja

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

Notatka

Ponieważ używamy już klastra Flink z magazynem metadanych Hive, nie ma potrzeby wykonywania żadnych dodatkowych konfiguracji.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Zrzut ekranu przedstawiający sposób tworzenia tabeli Kafka.

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

Zrzut ekranu przedstawiający sposób tworzenia tabeli mysql.

Zrzut ekranu przedstawiający dane wyjściowe tabeli.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

Zrzut ekranu przedstawiający, jak anulować transakcję użytkownika.

zrzut ekranu przedstawiający interfejs użytkownika Flink.

Sprawdź, czy dane zamówienia transakcji użytkownika na Kafce są dodawane do tabeli głównej w MySQL na Azure Cloud Shell.

Zrzut ekranu przedstawiający sposób sprawdzania transakcji użytkownika.

Tworzenie trzech kolejnych zamówień użytkowników na platformie Kafka

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Zrzut ekranu przedstawiający sposób sprawdzania danych tabeli platformy Kafka.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

Zrzut ekranu przedstawiający sposób sprawdzania tabeli zamówień.

Sprawdź, czy rekord product_id = 104 jest dodawany w tabeli zamówień na MySQL w Azure Cloud Shell.

Zrzut ekranu przedstawiający rekordy dodane do tabeli zamówień.

Odniesienie