Cómo usar el catálogo de Hive con Apache Flink® en HDInsight en AKS
Importante
Azure HDInsight en AKS se retiró el 31 de enero de 2025. Obtenga más información con este anuncio.
Debe migrar las cargas de trabajo a microsoft Fabric o un producto equivalente de Azure para evitar la terminación repentina de las cargas de trabajo.
Importante
Esta característica está actualmente en versión preliminar. Los Términos de uso complementarios para las versiones preliminares de Microsoft Azure incluyen más términos legales que se aplican a las características de Azure que se encuentran en versión beta, en versión preliminar o, de lo contrario, aún no se han publicado en disponibilidad general. Para obtener información sobre esta versión preliminar específica, consulte información de la versión preliminar de Azure HDInsight en AKS. Para preguntas o sugerencias de características, envíe una solicitud en AskHDInsight con los detalles y siganos para obtener más actualizaciones sobre comunidad de Azure HDInsight.
En este ejemplo se usa metastore de Hive como catálogo persistente con el catálogo de Hive de Apache Flink. Usamos esta funcionalidad para almacenar metadatos de tabla de Kafka y tabla MySQL en Flink entre sesiones. Flink usa la tabla Kafka registrada en el catálogo de Hive como origen, realiza algunas consultas y almacena los resultados en la base de datos MySQL.
Prerrequisitos
- Cluster de Apache Flink en HDInsight en AKS con Hive Metastore 3.1.2
-
clúster de Apache Kafka en HDInsight
- Debe asegurarse de que la configuración de red esté completa, tal como se describe en Uso de Kafka, para garantizar que HDInsight en AKS y los clústeres de HDInsight estén en la misma red virtual.
- MySQL 8.0.33
Apache Hive en Apache Flink
Flink ofrece una integración doble con Hive.
- El primer paso es usar Metastore de Hive (HMS) como catálogo persistente con HiveCatalog de Flink para almacenar metadatos específicos de Flink en las sesiones.
- Por ejemplo, los usuarios pueden almacenar sus tablas de Kafka o ElasticSearch en Metastore de Hive mediante HiveCatalog y reutilizarlas más adelante en consultas SQL.
- El segundo es ofrecer Flink como un motor alternativo para leer y escribir tablas de Hive.
- HiveCatalog está diseñado para ser compatible "listo para usar" con las instalaciones existentes de Hive. No es necesario modificar el metastore de Hive existente ni cambiar la colocación de datos ni el particionado de las tablas.
Para más información, consulte Apache Hive.
Preparación del entorno
Creación de un clúster de Apache Flink con HMS
Vamos a crear un clúster de Apache Flink con HMS en el portal de Azure, puedes consultar las instrucciones detalladas sobre Flink creación de clústeres.
Después de la creación del clúster, compruebe que HMS se está ejecutando o no en AKS.
Preparar el tema de Kafka para datos de transacción de órdenes de usuario en HDInsight
Descargue el archivo jar de cliente de kafka mediante el siguiente comando:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
Extraer el archivo tar con
tar -xvf kafka_2.12-3.2.0.tgz
Genere los mensajes en el tema de Kafka.
Otros comandos:
Nota
Debe reemplazar bootstrap-server por el nombre de host o dirección IP de sus propios agentes de Kafka.
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Preparación de los datos maestros de pedidos de usuario en MySQL en Azure
Prueba de la base de datos:
Preparar la tabla de pedidos:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
Utilizando SSH descargar los archivos jar necesarios del conector de Kafka y de la base de datos MySQL
Nota
Descargue el archivo JAR de la versión correcta según nuestra versión de Kafka de HDInsight y la versión de MySQL.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Mover el archivo jar del planificador
Mueva el archivo jar flink-table-planner_2.12-1.17.0-....jar ubicado en /opt del pod de webssh a /lib y mueva el archivo jar flink-table-planner-loader1.17.0-....jar desde /lib a /opt/flink-webssh/opt/. Consulte el problema en el documento para obtener más detalles. Realice los siguientes pasos para mover el archivo jar de Planner.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Nota
Un jar de planner adicional en movimiento solo es necesario cuando se usa el dialecto de Hive o el punto de conexión de servicio HiveServer2. Sin embargo, esta es la configuración recomendada para la integración de Hive.
Validación
Uso de bin/sql-client.sh para conectarse a Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Creación de un catálogo de Hive y conexión al catálogo de Hive en Flink SQL
Nota
Como ya usamos el clúster de Flink con Hive Metastore, no es necesario realizar ninguna configuración adicional.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Creación de una tabla de Kafka en Apache Flink SQL
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
Creación de una tabla MySQL en Apache Flink SQL
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Comprobación de tablas registradas en el catálogo de Hive anterior en Flink SQL
Cargar la información del pedido de transacción de usuario en la tabla maestra de pedidos de MySQL en Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Compruebe si los datos de pedidos de transacciones de usuario en Kafka se agregan en el orden de la tabla maestra en MySQL en Azure Cloud Shell
Creación de tres pedidos de usuario más en Kafka
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Comprobación de datos de tabla de Kafka en Flink SQL
Flink SQL> select * from kafka_user_orders;
Insertar product_id=104
en la tabla orders de MySQL en Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
Compruebe que el registro product_id = 104
se ha añadido en la tabla de órdenes en MySQL en Azure Cloud Shell.
Referencia
- Apache Hive
- Apache, Apache Hive, Hive, Apache Flink, Flink y los nombres de proyecto de código abierto asociados son marcas comerciales de la de Apache Software Foundation (ASF) de.