Partager via


Change Data Capture (CDC) de la table PostgreSQL à l’aide d’Apache Flink®

Remarque

Nous allons mettre hors service Azure HDInsight sur AKS le 31 janvier 2025. Avant le 31 janvier 2025, vous devrez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent afin d’éviter leur arrêt brutal. Les clusters restants de votre abonnement seront arrêtés et supprimés de l’hôte.

Seul le support de base est disponible jusqu’à la date de mise hors service.

Important

Cette fonctionnalité est disponible actuellement en mode Aperçu. Les Conditions d’utilisation supplémentaires pour les préversions de Microsoft Azure contiennent davantage de conditions légales qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou ne se trouvant pas encore en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les Informations sur la préversion d’Azure HDInsight sur AKS. Si vous avez des questions ou des suggestions de fonctionnalités, soumettez une demande sur AskHDInsight avec les détails et suivez-nous pour obtenir les dernières actualités sur la Communauté Azure HDInsight.

Capture des changements de données (CDC) est une technique que vous pouvez utiliser pour suivre les modifications au niveau des lignes dans les tables de base de données en réponse aux opérations de création, de mise à jour et de suppression. Dans cet article, nous utilisons les Connecteurs CDC pour Apache Flink®, qui proposent un ensemble de connecteurs sources pour Apache Flink. Les connecteurs intègrent Debezium® comme moteur pour capturer les modifications des données.

Flink prend en charge l'interprétation des messages Debezium JSON et Avro en tant que messages INSERT/UPDATE/DELETE dans le système Apache Flink SQL.

Ce support est utile dans de nombreux cas pour :

  • Synchronisez les données incrémentielles des bases de données avec d'autres systèmes
  • Journaux d’audit
  • Créez des vues matérialisées en temps réel sur des bases de données
  • Afficher l'historique des modifications de jointure temporelle d'une table de base de données

Voyons maintenant comment surveiller les modifications sur la table PostgreSQL à l'aide de Flink-SQL CDC. Le connecteur PostgreSQL CDC permet de lire les données instantanées et les données incrémentielles de la base de données PostgreSQL.

Prérequis

Préparer la table et le client PostgreSQL

  • À l'aide d'une machine virtuelle Linux, installez le client PostgreSQL à l'aide des commandes ci-dessous

    sudo apt-get update
    sudo apt-get install postgresql-client
    
  • Installez le certificat pour vous connecter au serveur PostgreSQL via SSL

    wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem

  • Connectez-vous au serveur (remplacez l'hôte, le nom d'utilisateur et le nom de la base de données en conséquence)

    psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
    
  • Après vous être connecté avec succès à la base de données, créez un exemple de table

    CREATE TABLE shipments (
        shipment_id SERIAL NOT NULL PRIMARY KEY,
        order_id SERIAL NOT NULL,
        origin VARCHAR(255) NOT NULL,
        destination VARCHAR(255) NOT NULL,
        is_arrived BOOLEAN NOT NULL
      );
      ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
      ALTER TABLE public.shipments REPLICA IDENTITY FULL;
      INSERT INTO shipments
      VALUES (default,10001,'Beijing','Shanghai',false),
         (default,10002,'Hangzhou','Shanghai',false),
         (default,10003,'Shanghai','Hangzhou',false);
    
  • Pour activer CDC sur la base de données PostgreSQL, vous devez apporter les modifications suivantes.

    • Le niveau WAL doit être modifié en logical. Cette valeur peut être modifiée dans la section Paramètres du serveur sur le Portail Microsoft Azure.

      Capture d’écran montrant comment activer-cdc-sur-postgres-database.

    • L'utilisateur accédant à la table doit avoir ajouté le rôle 'REPLICATION'

      MODIFIER L'UTILISATEUR <username> AVEC LA RÉPLICATION ;

  • Pour créer la table CDC Flink PostgreSQL, téléchargez tous les fichiers jar dépendants. Utilisez le fichier pom.xml avec le contenu suivant.

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dep.download</groupId>
        <artifactId>dep-download</artifactId>
        <version>1.0-SNAPSHOT</version>
            <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
        <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        </dependencies>
    </project>
    
  • Utilisez la commande maven pour télécharger tous les fichiers jar dépendants

       mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
    

    Remarque

  • Une fois les jars dépendants téléchargés, démarrez le client Flink SQL, avec ces jars à importer dans la session. Complétez la commande comme suit,

    /opt/flink-webssh/bin/sql-client.sh -j
    /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j
    /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j
    /opt/flink-webssh/target/hamcrest-2.1.jar -j
    /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j
    /opt/flink-webssh/target/awaitility-4.0.1.jar -j
    /opt/flink-webssh/target/jsr308-all-1.1.2.jar
    

    Ces commandes démarrent le client SQL avec les dépendances telles que,

    user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar 
    
                                      ????????
                                  ????????????????
                               ???????        ??????? ?
                             ????   ?????????      ?????
                             ???         ???????    ?????
                               ???            ???   ?????
                                 ??       ???????????????
                               ?? ?   ???      ?????? ?????
                               ?????   ????     ????? ?????
                            ???????       ???   ??????? ???
                      ????????? ??         ??   ??????????
                     ????????  ??          ?   ?? ???????
                   ????  ???           ?  ?? ???????? ?????
                  ???? ? ??          ? ?? ????????    ???? ??
                 ???? ????          ??????????       ??? ?? ????
              ???? ?? ???       ???????????         ???? ? ?  ???
              ??? ?? ??? ?????????             ????           ???
              ??   ? ???????             ????????          ??? ??
              ???    ???   ????????????????????           ????  ?
             ????? ???   ??????  ????????                 ????  ??
             ????????  ???????????????                            ??
             ?? ????   ??????? ???       ??????    ??         ???
             ??? ???  ??? ???????            ????   ?????????????
              ??? ?????  ???? ??                ??      ????  ???
              ??  ???   ?     ??                ??              ??
               ??  ??         ??                 ??        ????????
                ?? ?????       ??                  ???????????    ??
                 ??   ????     ?                    ???????      ??
                  ???   ?????                         ?? ???????????
                   ????    ????                     ??????? ????????
                     ?????                          ??  ???? ?????
                        ????????????????????????????????? ?????
    
       ______ _ _       _      _____  ____  _        _____ _ _            _  BETA   
      | ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | | 
      | |__ | |_ _ __ | | __ | (___ | |  | | |      | |    | |_ ___ _ __ | |_ 
      |  __| | | | '_ \| |/ /  \___ \| |  | | |     | |    | | |/ _ \ '_ \| __|
      | |   | | | | | |   <   ____) | |__| | |____  | |____| | | __/ | | | |_ 
      |_|   |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
    
           Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/xcao/.flink-sql-history
    
    Flink SQL>
    
  • Créer une table CDC Flink PostgreSQL à l'aide du connecteur CDC

    CREATE TABLE shipments (
      shipment_id INT,
      order_id INT,
      origin STRING,
      destination STRING,
      is_arrived BOOLEAN,
      PRIMARY KEY (shipment_id) NOT ENFORCED
    ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'flinkpostgres.postgres.database.azure.com',
      'port' = '5432',
      'username' = 'username',
      'password' = 'password',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'shipments',
      'decoding.plugin.name' = 'pgoutput',
      'slot.name' = 'flink'
    );
    

Validation

  • Exécutez la commande 'select *' pour surveiller les modifications.

    select * from shipments;

    Capture d’écran montrant comment exécuter la commande de sélection.

Référence