Como usar o Catálogo do Hive com o Apache Flink® no HDInsight no AKS
Importante
O Azure HDInsight no AKS se aposentou em 31 de janeiro de 2025. Saiba mais com este comunicado.
Você precisa migrar suas cargas de trabalho para microsoft fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho.
Importante
Esse recurso está atualmente em versão prévia. Os termos de uso complementares para o Microsoft Azure Previews incluem mais termos legais que se aplicam aos recursos do Azure que estão em versão beta, em versão prévia ou ainda não lançados em disponibilidade geral. Para obter informações sobre essa versão prévia específica, consulte Azure HDInsight em informações de visualização do AKS. Para perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações na Comunidade Azure HDInsight .
Este exemplo usa o Metastore do Hive como um catálogo persistente com o Catálogo de Hive do Apache Flink. Usamos essa funcionalidade para armazenar metadados da tabela Kafka e da tabela MySQL no Flink entre sessões. O Flink usa a tabela Kafka registrada no Catálogo do Hive como fonte, realiza algumas consultas e insere o resultado no banco de dados MySQL.
Pré-requisitos
- Cluster do Apache Flink no HDInsight no AKS com o Metastore 3.1.2 do Hive
-
cluster do Apache Kafka no HDInsight
- Você precisa garantir que as configurações de rede sejam concluídas conforme descrito em usando o Kafka; isso é para garantir que o HDInsight em clusters AKS e HDInsight esteja na mesma VNet
- MySQL 8.0.33
Apache Hive no Apache Flink
O Flink oferece uma integração em duas etapas com o Hive.
- A primeira etapa é usar o HMS (Metastore do Hive) como um catálogo persistente com HiveCatalog do Flink para armazenar metadados específicos do Flink entre sessões.
- Por exemplo, os usuários podem armazenar suas tabelas Kafka ou ElasticSearch no Metastore do Hive usando HiveCatalog e reutilizá-las posteriormente em consultas SQL.
- A segunda é oferecer o Flink como um mecanismo alternativo para ler e escrever tabelas do Hive.
- O HiveCatalog foi projetado para ser "pronto para uso imediato", totalmente compatível com as instalações do Hive existentes. Você não precisa modificar o Metastore do Hive existente ou alterar o posicionamento de dados ou o particionamento de suas tabelas.
Para obter mais informações, consulte Apache Hive
Preparação do ambiente
Criar um cluster do Apache Flink com o HMS
Permite criar um cluster Apache Flink com o HMS no portal do Azure, você pode consultar as instruções detalhadas sobre criação de cluster Flink.
Após a criação do cluster, verifique se o HMS está em execução ou não no lado do AKS.
Preparar o tópico Kafka de dados de transação de pedido de usuário no HDInsight
Baixe o jar do cliente kafka usando o seguinte comando:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
Desatar o arquivo tar com
tar -xvf kafka_2.12-3.2.0.tgz
Produza as mensagens para o tópico Kafka.
Outros comandos:
Nota
Você deve substituir bootstrap-server por seu próprio nome de host ou IP de brokers Kafka
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Preparar dados mestres do pedido do usuário no MySQL no Azure
Testando o BD:
Preparar a tabela de pedidos:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
Usando SSH para baixar o conector Kafka necessário e os jars do Banco de Dados MySQL
Nota
Baixe o jar na versão correta de acordo com nossa versão do Kafka do HDInsight e a versão do MySQL.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Mover o arquivo jar do planejador
Mova o arquivo jar flink-table-planner_2.12-1.17.0-....jar localizado no diretório /opt do pod webssh para /lib e retire o arquivo jar flink-table-planner-loader1.17.0-....jar do diretório /lib para /opt/flink-webssh/opt/. Consulte a questão , para obter mais detalhes. Execute as etapas a seguir para mover o jar do planejador.
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Nota
Uma movimentação extra do jar de planejamento só é necessária ao usar o dialeto Hive ou o endpoint HiveServer2. No entanto, essa é a configuração recomendada para a integração do Hive.
Validação
Usar o bin/sql-client.sh para se conectar ao Flink SQL
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Criar catálogo do Hive e conectar-se ao catálogo do Hive no Flink SQL
Nota
Como já usamos o cluster Flink com o Metastore do Hive, não é necessário executar nenhuma configuração adicional.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Criar Tabela Kafka no SQL do Apache Flink
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
Criar tabela MySQL no SQL do Apache Flink
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Verificar tabelas registradas no catálogo do Hive acima no Flink SQL
Inserir informações de pedidos de transação do usuário na tabela de pedidos principal no MySQL no Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Verificar se os dados da ordem de transação do usuário no Kafka foram adicionados na ordem de tabela mestra no MySQL no Azure Cloud Shell
Criando mais três pedidos de usuário no Kafka
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Verifique os dados da tabela Kafka no SQL do Flink.
Flink SQL> select * from kafka_user_orders;
Inserir product_id=104
na tabela pedidos no MySQL no Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
Verificar se o registro product_id = 104
é adicionado na tabela de ordens no MySQL no Azure Cloud Shell
Referência
- Apache Hive
- Apache, Apache Hive, Hive, Apache Flink, Flink e nomes de projeto de software livre associados são marcas comerciais do ASF (Apache Software Foundation).