Change Data Capture (CDC) da tabela PostgreSQL usando Apache Flink®
Nota
Vamos desativar o Azure HDInsight no AKS em 31 de janeiro de 2025. Antes de 31 de janeiro de 2025, você precisará migrar suas cargas de trabalho para o Microsoft Fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho. Os clusters restantes na sua subscrição serão interrompidos e removidos do anfitrião.
Apenas o apoio básico estará disponível até à data da reforma.
Importante
Esta funcionalidade está atualmente em pré-visualização. Os Termos de Utilização Suplementares para Pré-visualizações do Microsoft Azure incluem mais termos legais que se aplicam a funcionalidades do Azure que estão em versão beta, em pré-visualização ou ainda não disponibilizadas para disponibilidade geral. Para obter informações sobre essa visualização específica, consulte Informações de visualização do Azure HDInsight no AKS. Para perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações na Comunidade do Azure HDInsight.
A Captura de Dados de Alteração (CDC) é uma técnica que você pode usar para controlar alterações no nível da linha em tabelas de banco de dados em resposta a operações de criação, atualização e exclusão. Neste artigo, usamos Conectores CDC para Apache Flink®, que oferecem um conjunto de conectores de origem para Apache Flink. Os conectores integram Debezium® como o motor para capturar as alterações de dados.
O Flink suporta a interpretação de mensagens Debezium JSON e Avro como mensagens INSERT/UPDATE/DELETE no sistema Apache Flink SQL.
Este apoio é útil em muitos casos para:
- Sincronizar dados incrementais de bancos de dados para outros sistemas
- Registos de auditoria
- Crie visualizações materializadas em tempo real em bancos de dados
- Exibir histórico de alteração de junção temporal de uma tabela de banco de dados
Agora, vamos aprender como monitorar alterações na tabela PostgreSQL usando Flink-SQL CDC. O conector PostgreSQL CDC permite a leitura de dados instantâneos e dados incrementais do banco de dados PostgreSQL.
Pré-requisitos
- Servidor flexível Azure PostgresSQL Versão 14.7
- Cluster Apache Flink no HDInsight no AKS
- Máquina virtual Linux para usar o cliente PostgreSQL
- Adicione a regra NSG que permite conexões de entrada e saída na porta 5432 no HDInsight na sub-rede do pool AKS.
Preparar tabela PostgreSQL & Cliente
Usando uma máquina virtual Linux, instale o cliente PostgreSQL usando os comandos abaixo
sudo apt-get update sudo apt-get install postgresql-client
Instalar o certificado para se conectar ao servidor PostgreSQL usando SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Conecte-se ao servidor (substitua o host, o nome de usuário e o nome do banco de dados de acordo)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
Depois de se conectar ao banco de dados com êxito, crie uma tabela de exemplo
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Para habilitar o CDC no banco de dados PostgreSQL, é necessário fazer as seguintes alterações.
Criar tabela Apache Flink PostgreSQL CDC
Para criar a tabela Flink PostgreSQL CDC, baixe todos os jars dependentes. Use o
pom.xml
arquivo com o seguinte conteúdo.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Use o comando maven para baixar todos os frascos dependentes
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Nota
- Se o seu web ssh pod não contém maven, por favor, siga os links para baixá-lo e instalá-lo.
- Para baixar o arquivo jsr jar, use o seguinte comando:
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
Depois que os jars dependentes forem baixados, inicie o cliente Flink SQL, com esses jars a serem importados para a sessão. Comando completo da seguinte forma,
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Esses comandos iniciam o cliente sql com as dependências como,
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Criar uma tabela Flink PostgreSQL CDC usando o conector CDC
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Validação
Referência
- Site do Apache Flink
- O PostgreSQL CDC Connector está licenciado sob a Licença Apache 2.0
- Apache, Apache Flink, Flink e nomes de projetos de código aberto associados são marcas comerciais da Apache Software Foundation (ASF).