Cdc (Captura de Dados de Alteração) da tabela PostgreSQL usando o Apache Flink®
Importante
O Azure HDInsight no AKS se aposentou em 31 de janeiro de 2025. Saiba mais com este comunicado.
Você precisa migrar suas cargas de trabalho para microsoft fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho.
Importante
Esse recurso está atualmente em versão prévia. Os termos de uso complementares para o Microsoft Azure Previews incluem mais termos legais que se aplicam aos recursos do Azure que estão em versão beta, em versão prévia ou ainda não lançados em disponibilidade geral. Para obter informações sobre essa versão prévia específica, consulte Azure HDInsight em informações de visualização do AKS. Para perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações sobre a Comunidade Azure HDInsight .
O CDC (Change Data Capture) é uma técnica que você pode usar para acompanhar as alterações no nível de linha nas tabelas de banco de dados em resposta a operações de criação, atualização e exclusão. Neste artigo, usamos conectores CDC para Apache Flink®, que oferecem um conjunto de conectores de origem para Apache Flink. Os conectores integram Debezium® como mecanismo de captura das alterações de dados.
O Flink dá suporte para interpretar mensagens JSON e Avro do Debezium como mensagens INSERT/UPDATE/DELETE no sistema SQL do Apache Flink.
Esse suporte é útil em muitos casos para:
- Sincronizar dados incrementais de bancos de dados para outros sistemas
- Logs de auditoria
- Criar exibições materializadas em tempo real em bancos de dados
- Exibir o histórico de alteração de junção temporal de uma tabela de banco de dados
Agora, vamos aprender a monitorar as alterações na tabela PostgreSQL usando Flink-SQL CDC. O conector CDC do PostgreSQL permite ler dados de instantâneo e dados incrementais do banco de dados PostgreSQL.
Pré-requisitos
- servidor flexível do Azure PostgreSQL versão 14.7
- Cluster do Apache Flink no HDInsight no AKS
- Máquina virtual do Linux para usar o cliente PostgreSQL
- Adicione a regra NSG que permite conexões de entrada e saída na porta 5432 no HDInsight na sub-rede do pool do AKS.
Preparar a tabela PostgreSQL & Client
Usando uma máquina virtual linux, instale o cliente PostgreSQL usando comandos abaixo
sudo apt-get update sudo apt-get install postgresql-client
Instalar o certificado para se conectar ao servidor PostgreSQL usando SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Conectar-se ao servidor (substituir host, nome de usuário e nome de banco de dados adequadamente)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
Depois de se conectar ao banco de dados com êxito, crie uma tabela de exemplo
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Para habilitar o CDC no banco de dados PostgreSQL, você precisará fazer as alterações a seguir.
Criar tabela CDC do Apache Flink PostgreSQL
Para criar a tabela Flink PostgreSQL CDC, baixe todos os jars dependentes. Use o arquivo
pom.xml
com o conteúdo a seguir.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Usar o comando maven para baixar todos os jars dependentes
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Nota
- Se o pod web SSH não tiver o Maven, siga os links para baixá-lo e instalá-lo.
- Para baixar o arquivo jsr jar, use o comando a seguir
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
Depois que os jars dependentes forem baixados, inicie o cliente SQL do Flink, com esses jars para serem importados na sessão. Conclua o comando da seguinte maneira,
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Esses comandos iniciam o cliente sql com as dependências como,
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Criar uma tabela CDC do PostgreSQL do Flink usando o conector CDC
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Validação
Referência
- site do Apache Flink
- O conector CDC do PostgreSQL é licenciado sob a Licença Apache 2.0
- Apache, Apache Flink, Flink e nomes de projeto de software livre associados são marcas comerciais da Apache Software Foundation (ASF).