Delen via


Hive Catalog gebruiken met Apache Flink® in HDInsight in AKS

Belangrijk

Azure HDInsight op AKS is op 31 januari 2025 buiten gebruik gesteld. Leer meer in deze aankondiging.

U moet uw workloads migreren naar Microsoft Fabric- of een gelijkwaardig Azure-product om plotselinge beëindiging van uw workloads te voorkomen.

Belangrijk

Deze functie is momenteel beschikbaar als preview-versie. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews meer juridische voorwaarden bevatten die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet in algemene beschikbaarheid zijn vrijgegeven. Zie Azure HDInsight in AKS preview-informatievoor meer informatie over deze specifieke preview. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight- met de details en volgt u ons voor meer updates over Azure HDInsight Community-.

In dit voorbeeld wordt de Metastore van Hive gebruikt als een permanente catalogus met de Hive-catalogus van Apache Flink. We gebruiken deze functionaliteit voor het opslaan van kafka-tabel- en MySQL-tabelmetagegevens op Flink in sessies. Flink maakt gebruik van een Kafka-tabel die is geregistreerd in de Hive Catalog als bron, voert enkele opzoekacties uit en voert het resultaat uit naar een MySQL-database.

Voorwaarden

Flink biedt een tweevoudige integratie met Hive.

  • De eerste stap is het gebruik van Hive Metastore (HMS) als permanente catalogus met HiveCatalog van Flink voor het opslaan van specifieke metagegevens in sessies.
    • Gebruikers kunnen bijvoorbeeld hun Kafka- of ElasticSearch-tabellen opslaan in Hive Metastore met behulp van HiveCatalog en ze later opnieuw gebruiken in SQL-query's.
  • De tweede is het aanbieden van Flink als alternatieve engine voor het lezen en schrijven van Hive-tabellen.
  • De HiveCatalog is ontworpen om 'out-of-the-box' compatibel te zijn met bestaande Hive-installaties. U hoeft uw bestaande Hive-metastore niet te wijzigen of de plaatsing of partitionering van uw tabellen te wijzigen.

Zie Apache Hive- voor meer informatie

Omgevingsvoorbereiding

Laten we een Apache Flink-cluster maken met HMS in Azure Portal, u kunt de gedetailleerde instructies voor het maken van een Flink-cluster raadplegen op .

Schermopname van het maken van een Flink-cluster.

Controleer na het maken van het cluster of HMS wordt uitgevoerd of niet aan de AKS-zijde.

Schermopname die laat zien hoe u de HMS-status controleert in het Flink-cluster.

Kafka-onderwerp over transactiegegevens van gebruikersorders voorbereiden in HDInsight

Download het kafka-client-JAR met behulp van de volgende opdracht:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

Pak het tar-bestand uit met

tar -xvf kafka_2.12-3.2.0.tgz

De berichten naar het Kafka-onderwerp produceren.

Schermopname waarin wordt getoond hoe berichten naar een Kafka-onderwerp worden geproduceerd.

Andere opdrachten:

Notitie

U moet bootstrap-server vervangen door uw eigen kafka-brokershostnaam of IP-adres

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Hoofdgegevens van gebruikersorders voorbereiden op MySQL in Azure

Database testen:

schermopname waarin wordt getoond hoe u de database in Kafka kunt testen.

schermopname van het uitvoeren van Cloud Shell in de portal.

De ordertabel voorbereiden:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

Gebruik SSH om vereiste Kafka-connector en MySQL-database-jars te downloaden.

Notitie

Download de juiste versie-JAR op basis van onze HDInsight kafka-versie en MySQL-versie.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

De planner-jar verplaatsen

Verplaats het jar-bestand flink-table-planner_2.12-1.17.0-....jar, gelegen in de /opt-directory van de webssh pod, naar /lib en verplaats het jar-bestand flink-table-planner-loader1.17.0-....jar vanuit /lib naar /opt/flink-webssh/opt/. Raadpleeg probleem voor meer informatie. Voer de volgende stappen uit om de planner-JAR te verplaatsen.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

Notitie

Een extra verplaatsing van de planner-jar is alleen nodig bij gebruik van het Hive-dialect of de HiveServer2-eindpunt. Dit is echter de aanbevolen installatie voor Hive-integratie.

Validering

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

Notitie

Aangezien we al een Flink-cluster gebruiken met Hive Metastore, hoeven er geen extra configuraties te worden uitgevoerd.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

schermopname waarin wordt getoond hoe u een Kafka-tabel maakt.

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

Schermopname waarin wordt getoond hoe u een mysql-tabel maakt.

schermopname met tabeluitvoer.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

Schermafbeelding die laat zien hoe een gebruikerstransactie kan worden gestopt.

Schermopname met Flink UI.

Controleer of gegevens van transactieorders voor gebruikers in Kafka zijn toegevoegd in de hoofdtabelvolgorde in MySQL in Azure Cloud Shell

schermopname die laat zien hoe u de transactie van de gebruiker controleert.

Nog drie gebruikersorders maken in Kafka

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Schermopname van het controleren van Kafka-tabelgegevens.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

Schermopname die laat zien hoe u de tabel met bestellingen controleert.

Controleer of de product_id = 104-record is toegevoegd in de bestellingentabel in MySQL in Azure Cloud Shell.

Schermopname van de records die zijn toegevoegd aan de ordertabel.

Referentie