Como usar o catálogo do Hive com o Apache Flink® no HDInsight no AKS
Importante
O Azure HDInsight no AKS foi desativado em 31 de janeiro de 2025. Saiba mais com este anúncio.
Você precisa migrar suas cargas de trabalho para Microsoft Fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho.
Importante
Esta funcionalidade está atualmente em pré-visualização. Os Termos de Utilização Suplementares para Pré-visualizações do Microsoft Azure incluem mais termos legais que se aplicam a funcionalidades do Azure que estão em versão beta, em pré-visualização ou ainda não disponíveis para o público em geral. Para obter informações sobre essa visualização específica, consulte Azure HDInsight no AKS informações de visualização. Para perguntas ou sugestões de funcionalidades, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para mais atualizações na Comunidade do Azure HDInsight .
Este exemplo usa o Metastore do Hive como um catálogo persistente com o Hive Catalog do Apache Flink. Usamos essa funcionalidade para armazenar metadados da tabela Kafka e da tabela MySQL no Flink entre sessões. Flink usa a tabela Kafka registada no Hive Catalog como fonte, realiza algumas pesquisas e envia o resultado para a base de dados MySQL.
Pré-requisitos
- Cluster Apache Flink no HDInsight no AKS com Hive Metastore 3.1.2
-
o cluster Apache Kafka no HDInsight
- O utilizador deve garantir que as configurações de rede estejam completas, conforme descrito em Usando Kafka; isto é para assegurar que o HDInsight no AKS e os clusters do HDInsight estejam na mesma VNet.
- MySQL 8.0.33
Apache Hive no Apache Flink
O Flink oferece uma dupla integração com o Hive.
- A primeira etapa é usar o Hive Metastore (HMS) como um catálogo persistente com o HiveCatalog do Flink para armazenar metadados específicos do Flink em sessões.
- Por exemplo, os usuários podem armazenar suas tabelas Kafka ou ElasticSearch no Hive Metastore usando o HiveCatalog e reutilizá-las posteriormente em consultas SQL.
- O segundo é oferecer o Flink como um motor alternativo para ler e escrever tabelas Hive.
- O HiveCatalog foi projetado para ser "pronto para uso" compatível com as instalações existentes do Hive. Você não precisa modificar seu Hive Metastore existente ou alterar o posicionamento de dados ou particionamento de suas tabelas.
Para obter mais informações, consulte Apache Hive
Preparação do ambiente
Criar um cluster Apache Flink com HMS
Vamos criar um cluster Apache Flink com HMS no portal do Azure, você pode consultar as instruções detalhadas sobre criação de cluster Flink.
Após a criação do cluster, verifique se o HMS está em execução ou não no lado do AKS.
Preparar o tópico Kafka para dados de transação de pedidos do utilizador no HDInsight
Baixe o jar do cliente kafka usando o seguinte comando:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
Descompacte o arquivo tar com
tar -xvf kafka_2.12-3.2.0.tgz
Produza as mensagens para o tópico Kafka.
Outros comandos:
Observação
É necessário substituir o bootstrap-server pelo seu próprio nome de anfitrião ou IP dos brokers do Kafka.
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Preparar dados mestre de ordem do usuário no MySQL no Azure
Banco de dados de teste:
Preparar a tabela de pedidos:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
Usando SSH, faça o download do conector necessário para o Kafka e dos jars de banco de dados MySQL.
Observação
Faça o download do jar correto de acordo com a versão do Kafka do HDInsight e a versão do MySQL.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Movendo o frasco do planejador
Mova o jar flink-table-planner_2.12-1.17.0-....jar localizado no diretório /opt do pod webssh para /lib e remova o jar flink-table-planner-loader1.17.0-....jar de /lib para /opt/flink-webssh/opt/. Consulte a questão do problema para obter mais detalhes. Execute as etapas a seguir para mover o jar do planejador.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Observação
Um movimento extra do jar do planner só é necessário ao usar o dialeto Hive ou o endpoint HiveServer2. No entanto, esta é a configuração recomendada para a integração do Hive.
Validação
Utilize bin/sql-client.sh para conectar ao Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Crie o catálogo do Hive e conecte-se ao catálogo do Hive no Flink SQL
Observação
Como já usamos o cluster Flink com o Hive Metastore, não há necessidade de executar configurações adicionais.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Criar tabela Kafka no Apache Flink SQL
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
Criar tabela MySQL no Apache Flink SQL
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Verifique as tabelas registradas no catálogo do Hive acima no Flink SQL
Insira as informações do pedido de transação do usuário na tabela mestre de pedidos do MySQL usando o Flink SQL.
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Verifique se os dados da ordem de transação do usuário no Kafka são adicionados na ordem da tabela mestre no MySQL no Azure Cloud Shell
Criando mais três pedidos de usuário no Kafka
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Verifique os dados da tabela Kafka no Flink SQL
Flink SQL> select * from kafka_user_orders;
Insira product_id=104
na tabela de pedidos no MySQL e no Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
Verifique se o registo product_id = 104
foi adicionado na tabela de ordens no MySQL no Azure Cloud Shell
Referência
- Apache Hive
- Apache, Apache Hive, Hive, Apache Flink, Flink e nomes de projetos de código aberto associados são marcas comerciais da Apache Software Foundation (ASF).