Partage via


Capture de données modifiées (CDC) de la table PostgreSQL à l’aide d’Apache Flink®

Important

Azure HDInsight sur AKS a été mis hors service le 31 janvier 2025. Découvrez-en plus grâce à cette annonce.

Vous devez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent pour éviter l’arrêt brusque de vos charges de travail.

Important

Cette fonctionnalité est actuellement en préversion. Les Conditions d’utilisation supplémentaires pour les préversions Microsoft Azure incluent des termes juridiques supplémentaires qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou qui ne sont pas encore publiées en disponibilité générale. Pour des informations concernant cette préversion spécifique, consultez les informations sur la préversion d'Azure HDInsight sur AKS . Pour des questions ou des suggestions de fonctionnalités, envoyez une demande sur AskHDInsight avec les détails et suivez-nous pour plus de mises à jour sur Communauté Azure HDInsight.

Capture de données modifiées (CDC) est une technique que vous pouvez utiliser pour suivre les modifications au niveau des lignes dans les tables de base de données en réponse aux opérations de création, de mise à jour et de suppression. Dans cet article, nous utilisons connecteurs CDC pour Apache Flink®, qui offrent un ensemble de connecteurs de source pour Apache Flink®. Les connecteurs intègrent Debezium® comme moteur pour la captation des changements de données.

Flink prend en charge l’interprétation des messages JSON et Avro Debezium en tant que messages INSERT/UPDATE/DELETE dans le système SQL Apache Flink.

Cette prise en charge est utile dans de nombreux cas pour :

  • Synchroniser des données incrémentielles à partir de bases de données vers d’autres systèmes
  • Journaux d’audit
  • Créer des vues matérialisées en temps réel sur des bases de données
  • Afficher l’historique des modifications de jointure temporelle d’une table de base de données

À présent, nous allons apprendre à surveiller les modifications sur la table PostgreSQL à l’aide de Flink-SQL cdc. Le connecteur CDC PostgreSQL permet de lire des données d’instantané et des données incrémentielles à partir de la base de données PostgreSQL.

Conditions préalables

Préparer la table PostgreSQL & Client

  • À l’aide d’une machine virtuelle Linux, installez le client PostgreSQL à l’aide des commandes ci-dessous

    sudo apt-get update
    sudo apt-get install postgresql-client
    
  • Installer le certificat pour se connecter au serveur PostgreSQL à l’aide de SSL

    wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem

  • Se connecter au serveur (remplacer l’hôte, le nom d’utilisateur et le nom de la base de données en conséquence)

    psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
    
  • Après la connexion à la base de données, créez un exemple de table

    CREATE TABLE shipments (
        shipment_id SERIAL NOT NULL PRIMARY KEY,
        order_id SERIAL NOT NULL,
        origin VARCHAR(255) NOT NULL,
        destination VARCHAR(255) NOT NULL,
        is_arrived BOOLEAN NOT NULL
      );
      ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
      ALTER TABLE public.shipments REPLICA IDENTITY FULL;
      INSERT INTO shipments
      VALUES (default,10001,'Beijing','Shanghai',false),
         (default,10002,'Hangzhou','Shanghai',false),
         (default,10003,'Shanghai','Hangzhou',false);
    
  • Pour activer la capture de données modifiées (CDC) sur une base de données PostgreSQL, vous devez apporter les modifications suivantes.

    • Le niveau WAL doit être modifié en logique. Cette valeur peut être modifiée dans la section paramètres du serveur sur le portail Azure.

      Capture d’écran montrant comment activer-cdc-on-postgres-database.

    • L’utilisateur accédant à la table doit avoir le rôle « REPLICATION » ajouté

      ALTER USER <username> WITH REPLICATION ;

  • Pour créer une table CDC Flink PostgreSQL, téléchargez tous les fichiers jar dépendants. Utilisez le fichier pom.xml avec le contenu suivant.

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dep.download</groupId>
        <artifactId>dep-download</artifactId>
        <version>1.0-SNAPSHOT</version>
            <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
        <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        </dependencies>
    </project>
    
  • Utiliser la commande maven pour télécharger tous les fichiers jar dépendants

       mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
    

    Remarque

  • Une fois que les fichiers jar dépendants sont téléchargés, démarrez le client Flink SQL, avec ces fichiers jar à importer dans la session. Complétez la commande comme suit :

    /opt/flink-webssh/bin/sql-client.sh -j
    /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j
    /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j
    /opt/flink-webssh/target/hamcrest-2.1.jar -j
    /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j
    /opt/flink-webssh/target/awaitility-4.0.1.jar -j
    /opt/flink-webssh/target/jsr308-all-1.1.2.jar
    

    Ces commandes démarrent le client sql avec les dépendances comme suit :

    user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar 
    
                                      ????????
                                  ????????????????
                               ???????        ??????? ?
                             ????   ?????????      ?????
                             ???         ???????    ?????
                               ???            ???   ?????
                                 ??       ???????????????
                               ?? ?   ???      ?????? ?????
                               ?????   ????     ????? ?????
                            ???????       ???   ??????? ???
                      ????????? ??         ??   ??????????
                     ????????  ??          ?   ?? ???????
                   ????  ???           ?  ?? ???????? ?????
                  ???? ? ??          ? ?? ????????    ???? ??
                 ???? ????          ??????????       ??? ?? ????
              ???? ?? ???       ???????????         ???? ? ?  ???
              ??? ?? ??? ?????????             ????           ???
              ??   ? ???????             ????????          ??? ??
              ???    ???   ????????????????????           ????  ?
             ????? ???   ??????  ????????                 ????  ??
             ????????  ???????????????                            ??
             ?? ????   ??????? ???       ??????    ??         ???
             ??? ???  ??? ???????            ????   ?????????????
              ??? ?????  ???? ??                ??      ????  ???
              ??  ???   ?     ??                ??              ??
               ??  ??         ??                 ??        ????????
                ?? ?????       ??                  ???????????    ??
                 ??   ????     ?                    ???????      ??
                  ???   ?????                         ?? ???????????
                   ????    ????                     ??????? ????????
                     ?????                          ??  ???? ?????
                        ????????????????????????????????? ?????
    
       ______ _ _       _      _____  ____  _        _____ _ _            _  BETA   
      | ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | | 
      | |__ | |_ _ __ | | __ | (___ | |  | | |      | |    | |_ ___ _ __ | |_ 
      |  __| | | | '_ \| |/ /  \___ \| |  | | |     | |    | | |/ _ \ '_ \| __|
      | |   | | | | | |   <   ____) | |__| | |____  | |____| | | __/ | | | |_ 
      |_|   |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
    
           Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/xcao/.flink-sql-history
    
    Flink SQL>
    
  • Utiliser le connecteur CDC pour créer une table CDC Flink PostgreSQL

    CREATE TABLE shipments (
      shipment_id INT,
      order_id INT,
      origin STRING,
      destination STRING,
      is_arrived BOOLEAN,
      PRIMARY KEY (shipment_id) NOT ENFORCED
    ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'flinkpostgres.postgres.database.azure.com',
      'port' = '5432',
      'username' = 'username',
      'password' = 'password',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'shipments',
      'decoding.plugin.name' = 'pgoutput',
      'slot.name' = 'flink'
    );
    

Validation

  • Exécutez la commande « select * » pour surveiller les modifications.

    select * from shipments;

    Capture d’écran montrant comment exécuter-select-command.

Référence