Compartir vía


Captura de datos modificados (CDC) de la tabla postgreSQL mediante Apache Flink®

Importante

Azure HDInsight en AKS se retiró el 31 de enero de 2025. Obtenga más información con este anuncio.

Debe migrar las cargas de trabajo a microsoft Fabric o un producto equivalente de Azure para evitar la terminación repentina de las cargas de trabajo.

Importante

Esta característica está actualmente en versión preliminar. Los Términos de uso complementarios para las versiones preliminares de Microsoft Azure incluyen más términos legales que se aplican a las características de Azure que se encuentran en versión beta, en versión preliminar o, de lo contrario, aún no se han publicado en disponibilidad general. Para obtener información sobre esta versión preliminar específica, consulte información sobre la versión preliminar de Azure HDInsight en AKS. Para preguntas o sugerencias de características, envíe una solicitud en AskHDInsight con los detalles y síganos para obtener más actualizaciones sobre Comunidad de Azure HDInsight.

La captura de datos modificados (CDC) es una técnica que puede usar para realizar un seguimiento de los cambios de nivel de fila en las tablas de base de datos en respuesta a las operaciones de creación, actualización y eliminación. En este artículo, usamos conectores CDC para Apache Flink®, que ofrecen un conjunto de conectores de origen para Apache Flink. Los conectores integran Debezium® como motor para capturar los cambios de datos.

Flink admite la interpretación de los mensajes JSON y Avro de Debezium como mensajes INSERT/UPDATE/DELETE en el sistema SQL de Apache Flink.

Esta compatibilidad es útil en muchos casos para:

  • Sincronizar datos incrementales de bases de datos a otros sistemas
  • Registros de auditoría
  • Creación de vistas materializadas en tiempo real en bases de datos
  • Visualización del historial de cambios de combinación temporal de una tabla de base de datos

Ahora, vamos a aprender a supervisar los cambios en la tabla postgreSQL mediante Flink-SQL CDC. El conector CDC de PostgreSQL permite leer datos de instantáneas y datos incrementales de la base de datos postgreSQL.

Prerrequisitos

Preparación de la tabla de PostgreSQL & Client

  • Con una máquina virtual Linux, instale el cliente postgreSQL mediante los siguientes comandos.

    sudo apt-get update
    sudo apt-get install postgresql-client
    
  • Instalación del certificado para conectarse al servidor postgreSQL mediante SSL

    wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem

  • Conéctese al servidor (reemplace el host, el nombre de usuario y el nombre de la base de datos en consecuencia).

    psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
    
  • Después de conectarse correctamente a la base de datos, cree una tabla de ejemplo.

    CREATE TABLE shipments (
        shipment_id SERIAL NOT NULL PRIMARY KEY,
        order_id SERIAL NOT NULL,
        origin VARCHAR(255) NOT NULL,
        destination VARCHAR(255) NOT NULL,
        is_arrived BOOLEAN NOT NULL
      );
      ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
      ALTER TABLE public.shipments REPLICA IDENTITY FULL;
      INSERT INTO shipments
      VALUES (default,10001,'Beijing','Shanghai',false),
         (default,10002,'Hangzhou','Shanghai',false),
         (default,10003,'Shanghai','Hangzhou',false);
    
  • Para habilitar CDC en la base de datos postgreSQL, es necesario realizar los siguientes cambios.

    • El nivel WAL debe cambiarse a lógico. Este valor se puede cambiar en la sección parámetros del servidor en Azure Portal.

      Captura de pantalla que muestra cómo habilitar-cdc-on-postgres-database.

    • El usuario que accede a la tabla debe tener el rol "REPLICATION" agregado

      ALTER USER <username> WITH REPLICATION;

  • Para crear una tabla CDC de PostgreSQL de Flink, descargue todos los archivos JAR dependientes. Use el archivo pom.xml con el siguiente contenido.

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dep.download</groupId>
        <artifactId>dep-download</artifactId>
        <version>1.0-SNAPSHOT</version>
            <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
        <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        </dependencies>
    </project>
    
  • Use el comando maven para descargar todos los archivos JAR dependientes.

       mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
    

    Nota

  • Una vez descargados los archivos JAR dependientes, inicie el cliente de Flink SQL, con estos archivos JAR que se importarán en la sesión. Complete el comando como se indica a continuación,

    /opt/flink-webssh/bin/sql-client.sh -j
    /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j
    /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j
    /opt/flink-webssh/target/hamcrest-2.1.jar -j
    /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j
    /opt/flink-webssh/target/awaitility-4.0.1.jar -j
    /opt/flink-webssh/target/jsr308-all-1.1.2.jar
    

    Estos comandos inician el cliente sql con las dependencias como:

    user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar 
    
                                      ????????
                                  ????????????????
                               ???????        ??????? ?
                             ????   ?????????      ?????
                             ???         ???????    ?????
                               ???            ???   ?????
                                 ??       ???????????????
                               ?? ?   ???      ?????? ?????
                               ?????   ????     ????? ?????
                            ???????       ???   ??????? ???
                      ????????? ??         ??   ??????????
                     ????????  ??          ?   ?? ???????
                   ????  ???           ?  ?? ???????? ?????
                  ???? ? ??          ? ?? ????????    ???? ??
                 ???? ????          ??????????       ??? ?? ????
              ???? ?? ???       ???????????         ???? ? ?  ???
              ??? ?? ??? ?????????             ????           ???
              ??   ? ???????             ????????          ??? ??
              ???    ???   ????????????????????           ????  ?
             ????? ???   ??????  ????????                 ????  ??
             ????????  ???????????????                            ??
             ?? ????   ??????? ???       ??????    ??         ???
             ??? ???  ??? ???????            ????   ?????????????
              ??? ?????  ???? ??                ??      ????  ???
              ??  ???   ?     ??                ??              ??
               ??  ??         ??                 ??        ????????
                ?? ?????       ??                  ???????????    ??
                 ??   ????     ?                    ???????      ??
                  ???   ?????                         ?? ???????????
                   ????    ????                     ??????? ????????
                     ?????                          ??  ???? ?????
                        ????????????????????????????????? ?????
    
       ______ _ _       _      _____  ____  _        _____ _ _            _  BETA   
      | ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | | 
      | |__ | |_ _ __ | | __ | (___ | |  | | |      | |    | |_ ___ _ __ | |_ 
      |  __| | | | '_ \| |/ /  \___ \| |  | | |     | |    | | |/ _ \ '_ \| __|
      | |   | | | | | |   <   ____) | |__| | |____  | |____| | | __/ | | | |_ 
      |_|   |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
    
           Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/xcao/.flink-sql-history
    
    Flink SQL>
    
  • Creación de una tabla CDC de PostgreSQL de Flink mediante el conector CDC

    CREATE TABLE shipments (
      shipment_id INT,
      order_id INT,
      origin STRING,
      destination STRING,
      is_arrived BOOLEAN,
      PRIMARY KEY (shipment_id) NOT ENFORCED
    ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'flinkpostgres.postgres.database.azure.com',
      'port' = '5432',
      'username' = 'username',
      'password' = 'password',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'shipments',
      'decoding.plugin.name' = 'pgoutput',
      'slot.name' = 'flink'
    );
    

Validación

  • Ejecute el comando "select *" para supervisar los cambios.

    select * from shipments;

    Captura de pantalla que muestra cómo ejecutar, seleccionar y comandar.

Referencia