Compartir a través de


Cómo usar el catálogo de Hive con Apache Flink® en HDInsight en AKS

Importante

Azure HDInsight en AKS se retiró el 31 de enero de 2025. Obtenga más información con este anuncio.

Debe migrar las cargas de trabajo a microsoft Fabric o un producto equivalente de Azure para evitar la terminación repentina de las cargas de trabajo.

Importante

Esta característica está actualmente en versión preliminar. Los Términos de uso complementarios para las versiones preliminares de Microsoft Azure incluyen más términos legales que se aplican a las características de Azure que se encuentran en versión beta, en versión preliminar o, de lo contrario, aún no se han publicado en disponibilidad general. Para obtener información sobre esta versión preliminar específica, consulte información de la versión preliminar de Azure HDInsight en AKS. Para preguntas o sugerencias de características, envíe una solicitud en AskHDInsight con los detalles y siganos para obtener más actualizaciones sobre comunidad de Azure HDInsight.

En este ejemplo se usa metastore de Hive como catálogo persistente con el catálogo de Hive de Apache Flink. Usamos esta funcionalidad para almacenar metadatos de tabla de Kafka y tabla MySQL en Flink entre sesiones. Flink usa la tabla Kafka registrada en el catálogo de Hive como origen, realiza algunas consultas y almacena los resultados en la base de datos MySQL.

Prerrequisitos

Flink ofrece una integración doble con Hive.

  • El primer paso es usar Metastore de Hive (HMS) como catálogo persistente con HiveCatalog de Flink para almacenar metadatos específicos de Flink en las sesiones.
    • Por ejemplo, los usuarios pueden almacenar sus tablas de Kafka o ElasticSearch en Metastore de Hive mediante HiveCatalog y reutilizarlas más adelante en consultas SQL.
  • El segundo es ofrecer Flink como un motor alternativo para leer y escribir tablas de Hive.
  • HiveCatalog está diseñado para ser compatible "listo para usar" con las instalaciones existentes de Hive. No es necesario modificar el metastore de Hive existente ni cambiar la colocación de datos ni el particionado de las tablas.

Para más información, consulte Apache Hive.

Preparación del entorno

Vamos a crear un clúster de Apache Flink con HMS en el portal de Azure, puedes consultar las instrucciones detalladas sobre Flink creación de clústeres.

Captura de pantalla que muestra cómo crear un clúster de Flink.

Después de la creación del clúster, compruebe que HMS se está ejecutando o no en AKS.

Captura de pantalla que muestra cómo comprobar el estado de HMS en el clúster de Flink.

Preparar el tema de Kafka para datos de transacción de órdenes de usuario en HDInsight

Descargue el archivo jar de cliente de kafka mediante el siguiente comando:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

Extraer el archivo tar con

tar -xvf kafka_2.12-3.2.0.tgz

Genere los mensajes en el tema de Kafka.

Captura de pantalla que muestra cómo generar mensajes en el tema de Kafka.

Otros comandos:

Nota

Debe reemplazar bootstrap-server por el nombre de host o dirección IP de sus propios agentes de Kafka.

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Preparación de los datos maestros de pedidos de usuario en MySQL en Azure

Prueba de la base de datos:

Captura de pantalla que muestra cómo probar la base de datos en Kafka.

Captura de pantalla que muestra cómo ejecutar Cloud Shell en el portal.

Preparar la tabla de pedidos:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

Utilizando SSH descargar los archivos jar necesarios del conector de Kafka y de la base de datos MySQL

Nota

Descargue el archivo JAR de la versión correcta según nuestra versión de Kafka de HDInsight y la versión de MySQL.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

Mover el archivo jar del planificador

Mueva el archivo jar flink-table-planner_2.12-1.17.0-....jar ubicado en /opt del pod de webssh a /lib y mueva el archivo jar flink-table-planner-loader1.17.0-....jar desde /lib a /opt/flink-webssh/opt/. Consulte el problema en el documento para obtener más detalles. Realice los siguientes pasos para mover el archivo jar de Planner.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

Nota

Un jar de planner adicional en movimiento solo es necesario cuando se usa el dialecto de Hive o el punto de conexión de servicio HiveServer2. Sin embargo, esta es la configuración recomendada para la integración de Hive.

Validación

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

Nota

Como ya usamos el clúster de Flink con Hive Metastore, no es necesario realizar ninguna configuración adicional.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Captura de pantalla que muestra cómo crear una tabla de Kafka.

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

Captura de pantalla que muestra cómo crear una tabla mysql.

Captura de pantalla que muestra la salida de la tabla.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

Captura de pantalla en la que se muestra cómo anular una transacción de usuario.

Captura de pantalla que muestra la interfaz de usuario de Flink.

Compruebe si los datos de pedidos de transacciones de usuario en Kafka se agregan en el orden de la tabla maestra en MySQL en Azure Cloud Shell

Captura de pantalla que muestra cómo comprobar la transacción del usuario.

Creación de tres pedidos de usuario más en Kafka

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Captura de pantalla que muestra cómo comprobar los datos de la tabla de Kafka.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

Captura de pantalla que muestra cómo comprobar la tabla de pedidos.

Compruebe que el registro product_id = 104 se ha añadido en la tabla de órdenes en MySQL en Azure Cloud Shell.

Captura de pantalla que muestra los registros agregados a la tabla de pedidos.

Referencia