Partilhar via


Captura de Alterações de Dados (CDC) da tabela PostgreSQL usando o Apache Flink®

Importante

O Azure HDInsight no AKS foi desativado em 31 de janeiro de 2025. Saiba mais com este anúncio.

Você precisa migrar suas cargas de trabalho para Microsoft Fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho.

Importante

Esta funcionalidade está atualmente em pré-visualização. Os Termos de Utilização Suplementares para Versões de Avaliação do Microsoft Azure incluem mais termos legais que se aplicam a funcionalidades do Azure em versão beta, pré-visualização ou ainda não disponibilizadas ao público em geral. Para obter informações sobre esta visualização específica, consulte as informações de visualização do Azure HDInsight no AKS . Para perguntas ou sugestões de funcionalidades, envie um pedido em AskHDInsight com os detalhes e siga-nos para mais atualizações na Comunidade Azure HDInsight .

A Captura de Dados de Alteração (CDC) é uma técnica que você pode usar para controlar alterações no nível da linha em tabelas de banco de dados em resposta a operações de criação, atualização e exclusão. Neste artigo, usamos Conectores CDC para Apache Flink®, que oferecem um conjunto de conectores de origem para Apache Flink. Os conectores integram Debezium® como motor para a captura das alterações de dados.

O Flink suporta a interpretação de mensagens Debezium JSON e Avro como mensagens INSERT/UPDATE/DELETE no sistema Apache Flink SQL.

Este apoio é útil em muitos casos para:

  • Sincronizar dados incrementais de bancos de dados para outros sistemas
  • Logs de auditoria
  • Crie visualizações materializadas em tempo real em bancos de dados
  • Exibir histórico de alteração de junção temporal de uma tabela de banco de dados

Agora, vamos aprender como monitorar alterações na tabela PostgreSQL usando Flink-SQL CDC. O conector PostgreSQL CDC permite a leitura de dados instantâneos e dados incrementais do banco de dados PostgreSQL.

Pré-requisitos

Preparar a tabela PostgreSQL do cliente &

  • Usando uma máquina virtual Linux, instale o cliente PostgreSQL usando os comandos abaixo

    sudo apt-get update
    sudo apt-get install postgresql-client
    
  • Instalar o certificado para se conectar ao servidor PostgreSQL usando SSL

    wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem

  • Conecte-se ao servidor (substitua o host, o nome de usuário e o nome do banco de dados de acordo)

    psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
    
  • Depois de se conectar ao banco de dados com êxito, crie uma tabela de exemplo

    CREATE TABLE shipments (
        shipment_id SERIAL NOT NULL PRIMARY KEY,
        order_id SERIAL NOT NULL,
        origin VARCHAR(255) NOT NULL,
        destination VARCHAR(255) NOT NULL,
        is_arrived BOOLEAN NOT NULL
      );
      ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
      ALTER TABLE public.shipments REPLICA IDENTITY FULL;
      INSERT INTO shipments
      VALUES (default,10001,'Beijing','Shanghai',false),
         (default,10002,'Hangzhou','Shanghai',false),
         (default,10003,'Shanghai','Hangzhou',false);
    
  • Para habilitar o CDC no banco de dados PostgreSQL, é necessário fazer as seguintes alterações.

    • O nível WAL deve ser alterado para lógico. Esse valor pode ser alterado na seção de parâmetros do servidor no portal do Azure.

      Captura de tela mostrando como habilitar-cdc-on-postgres-database.

    • O usuário que acessa a tabela deve ter a função 'REPLICATION' adicionada

      ALTER USER <username> COM REPLICAÇÃO;

  • Para criar a tabela Flink PostgreSQL CDC, baixe todos os jars dependentes. Use o arquivo pom.xml com o seguinte conteúdo.

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dep.download</groupId>
        <artifactId>dep-download</artifactId>
        <version>1.0-SNAPSHOT</version>
            <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
        <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        </dependencies>
    </project>
    
  • Use o comando do Maven para baixar todos os ficheiros JAR dependentes

       mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
    

    Observação

  • Depois que os jars dependentes forem baixados, inicie o cliente Flink SQL, com esses jars a serem importados para a sessão. Complete o comando da seguinte forma.

    /opt/flink-webssh/bin/sql-client.sh -j
    /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j
    /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j
    /opt/flink-webssh/target/hamcrest-2.1.jar -j
    /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j
    /opt/flink-webssh/target/awaitility-4.0.1.jar -j
    /opt/flink-webssh/target/jsr308-all-1.1.2.jar
    

    Esses comandos iniciam o cliente sql com as dependências como,

    user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar 
    
                                      ????????
                                  ????????????????
                               ???????        ??????? ?
                             ????   ?????????      ?????
                             ???         ???????    ?????
                               ???            ???   ?????
                                 ??       ???????????????
                               ?? ?   ???      ?????? ?????
                               ?????   ????     ????? ?????
                            ???????       ???   ??????? ???
                      ????????? ??         ??   ??????????
                     ????????  ??          ?   ?? ???????
                   ????  ???           ?  ?? ???????? ?????
                  ???? ? ??          ? ?? ????????    ???? ??
                 ???? ????          ??????????       ??? ?? ????
              ???? ?? ???       ???????????         ???? ? ?  ???
              ??? ?? ??? ?????????             ????           ???
              ??   ? ???????             ????????          ??? ??
              ???    ???   ????????????????????           ????  ?
             ????? ???   ??????  ????????                 ????  ??
             ????????  ???????????????                            ??
             ?? ????   ??????? ???       ??????    ??         ???
             ??? ???  ??? ???????            ????   ?????????????
              ??? ?????  ???? ??                ??      ????  ???
              ??  ???   ?     ??                ??              ??
               ??  ??         ??                 ??        ????????
                ?? ?????       ??                  ???????????    ??
                 ??   ????     ?                    ???????      ??
                  ???   ?????                         ?? ???????????
                   ????    ????                     ??????? ????????
                     ?????                          ??  ???? ?????
                        ????????????????????????????????? ?????
    
       ______ _ _       _      _____  ____  _        _____ _ _            _  BETA   
      | ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | | 
      | |__ | |_ _ __ | | __ | (___ | |  | | |      | |    | |_ ___ _ __ | |_ 
      |  __| | | | '_ \| |/ /  \___ \| |  | | |     | |    | | |/ _ \ '_ \| __|
      | |   | | | | | |   <   ____) | |__| | |____  | |____| | | __/ | | | |_ 
      |_|   |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
    
           Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/xcao/.flink-sql-history
    
    Flink SQL>
    
  • Criar uma tabela Flink PostgreSQL CDC usando o conector CDC

    CREATE TABLE shipments (
      shipment_id INT,
      order_id INT,
      origin STRING,
      destination STRING,
      is_arrived BOOLEAN,
      PRIMARY KEY (shipment_id) NOT ENFORCED
    ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'flinkpostgres.postgres.database.azure.com',
      'port' = '5432',
      'username' = 'username',
      'password' = 'password',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'shipments',
      'decoding.plugin.name' = 'pgoutput',
      'slot.name' = 'flink'
    );
    

Validação

  • Execute o comando 'select *' para monitorar as alterações.

    select * from shipments;

    Captura de tela mostrando como executar-select-command.

Referência