Captura de Alterações de Dados (CDC) da tabela PostgreSQL usando o Apache Flink®
Importante
O Azure HDInsight no AKS foi desativado em 31 de janeiro de 2025. Saiba mais com este anúncio.
Você precisa migrar suas cargas de trabalho para Microsoft Fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho.
Importante
Esta funcionalidade está atualmente em pré-visualização. Os Termos de Utilização Suplementares para Versões de Avaliação do Microsoft Azure incluem mais termos legais que se aplicam a funcionalidades do Azure em versão beta, pré-visualização ou ainda não disponibilizadas ao público em geral. Para obter informações sobre esta visualização específica, consulte as informações de visualização do Azure HDInsight no AKS . Para perguntas ou sugestões de funcionalidades, envie um pedido em AskHDInsight com os detalhes e siga-nos para mais atualizações na Comunidade Azure HDInsight .
A Captura de Dados de Alteração (CDC) é uma técnica que você pode usar para controlar alterações no nível da linha em tabelas de banco de dados em resposta a operações de criação, atualização e exclusão. Neste artigo, usamos Conectores CDC para Apache Flink®, que oferecem um conjunto de conectores de origem para Apache Flink. Os conectores integram Debezium® como motor para a captura das alterações de dados.
O Flink suporta a interpretação de mensagens Debezium JSON e Avro como mensagens INSERT/UPDATE/DELETE no sistema Apache Flink SQL.
Este apoio é útil em muitos casos para:
- Sincronizar dados incrementais de bancos de dados para outros sistemas
- Logs de auditoria
- Crie visualizações materializadas em tempo real em bancos de dados
- Exibir histórico de alteração de junção temporal de uma tabela de banco de dados
Agora, vamos aprender como monitorar alterações na tabela PostgreSQL usando Flink-SQL CDC. O conector PostgreSQL CDC permite a leitura de dados instantâneos e dados incrementais do banco de dados PostgreSQL.
Pré-requisitos
- Servidor Flexível Azure PostgreSQL Versão 14.7
- Cluster Apache Flink em HDInsight em AKS
- Máquina virtual Linux para usar o cliente PostgreSQL
- Adicione a regra NSG que permite conexões de entrada e saída na porta 5432 no HDInsight na sub-rede do pool AKS.
Preparar a tabela PostgreSQL do cliente &
Usando uma máquina virtual Linux, instale o cliente PostgreSQL usando os comandos abaixo
sudo apt-get update sudo apt-get install postgresql-client
Instalar o certificado para se conectar ao servidor PostgreSQL usando SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Conecte-se ao servidor (substitua o host, o nome de usuário e o nome do banco de dados de acordo)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
Depois de se conectar ao banco de dados com êxito, crie uma tabela de exemplo
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Para habilitar o CDC no banco de dados PostgreSQL, é necessário fazer as seguintes alterações.
Criar tabela Apache Flink PostgreSQL CDC
Para criar a tabela Flink PostgreSQL CDC, baixe todos os jars dependentes. Use o arquivo
pom.xml
com o seguinte conteúdo.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Use o comando do Maven para baixar todos os ficheiros JAR dependentes
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Observação
- Se o seu web ssh pod não contém maven, por favor, siga os links para baixá-lo e instalá-lo.
- Para baixar o arquivo jsr jar, use o seguinte comando:
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
Depois que os jars dependentes forem baixados, inicie o cliente Flink SQL, com esses jars a serem importados para a sessão. Complete o comando da seguinte forma.
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Esses comandos iniciam o cliente sql com as dependências como,
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Criar uma tabela Flink PostgreSQL CDC usando o conector CDC
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Validação
Referência
- Website Apache Flink
- PostgreSQL CDC Connector está licenciado sob Apache 2.0 License
- Apache, Apache Flink, Flink e nomes de projetos de código aberto associados são marcas comerciais da Apache Software Foundation (ASF).