Freigeben über


Change Data Capture (CDC) von PostgreSQL-Tabellen mit Apache Flink®

Hinweis

Azure HDInsight on AKS wird am 31. Januar 2025 eingestellt. Vor dem 31. Januar 2025 müssen Sie Ihre Workloads zu Microsoft Fabric oder einem gleichwertigen Azure-Produkt migrieren, um eine abruptes Beendigung Ihrer Workloads zu vermeiden. Die verbleibenden Cluster in Ihrem Abonnement werden beendet und vom Host entfernt.

Bis zum Einstellungsdatum ist nur grundlegende Unterstützung verfügbar.

Wichtig

Diese Funktion steht derzeit als Vorschau zur Verfügung. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure-Vorschauen enthalten weitere rechtliche Bestimmungen, die für Azure-Features in Betaversionen, in Vorschauversionen oder anderen Versionen gelten, die noch nicht allgemein verfügbar gemacht wurden. Informationen zu dieser spezifischen Vorschau finden Sie unter Informationen zur Vorschau von Azure HDInsight on AKS. Bei Fragen oder Funktionsvorschlägen senden Sie eine Anfrage an AskHDInsight mit den entsprechenden Details, und folgen Sie uns für weitere Updates in der Azure HDInsight-Community.

Change Data Capture (CDC) ist eine Methode, mit der Sie als Reaktion auf Erstellungs-, Aktualisierungs- und Löschvorgänge vorgenommene Änderungen auf Zeilenebene in Datenbanktabellen nachverfolgen können. In diesem Artikel verwenden wir CDC-Connectors für Apache Flink®, die eine Reihe von Quellconnectors für Apache Flink bieten. Die Connectors integrieren Debezium® als Modul zum Aufzeichnen der Datenänderungen.

Flink unterstützt die Interpretation von Debezium-JSON- und -Avro-Nachrichten als INSERT-/UPDATE-/DELETE-Nachrichten an das Apache Flink SQL-System.

Diese Unterstützung ist in vielen Fällen nützlich, da sie Folgendes ermöglicht:

  • Synchronisieren inkrementeller Daten aus Datenbanken mit anderen Systemen
  • Überwachungsprotokolle
  • Erstellen materialisierter Echtzeitsichten für Datenbanken
  • Anzeigen des temporalen Verlaufs der Verknüpfungsänderungen einer Datenbanktabelle

Als Nächstes erfahren Sie, wie Sie Änderungen an einer PostgreSQL-Tabelle mithilfe von Flink SQL CDC überwachen. Der PostgreSQL-CDC-Connector ermöglicht das Lesen von Momentaufnahmedaten und inkrementellen Daten aus einer PostgreSQL-Datenbank.

Voraussetzungen

Vorbereiten von PostgreSQL-Tabelle und -Client

  • Verwenden Sie einen virtuellen Linux-Computer, und installieren Sie den PostgreSQL-Client mithilfe folgender Befehle:

    sudo apt-get update
    sudo apt-get install postgresql-client
    
  • Installieren Sie das Zertifikat zum Herstellen einer SSL-Verbindung mit dem PostgreSQL-Server.

    wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem

  • Stellen Sie eine Verbindung mit dem Server her. (Ersetzen Sie dabei Host, Benutzername und Datenbankname entsprechend.)

    psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
    
  • Erstellen Sie nach erfolgreicher Verbindungsherstellung mit der Datenbank eine Beispieltabelle.

    CREATE TABLE shipments (
        shipment_id SERIAL NOT NULL PRIMARY KEY,
        order_id SERIAL NOT NULL,
        origin VARCHAR(255) NOT NULL,
        destination VARCHAR(255) NOT NULL,
        is_arrived BOOLEAN NOT NULL
      );
      ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
      ALTER TABLE public.shipments REPLICA IDENTITY FULL;
      INSERT INTO shipments
      VALUES (default,10001,'Beijing','Shanghai',false),
         (default,10002,'Hangzhou','Shanghai',false),
         (default,10003,'Shanghai','Hangzhou',false);
    
  • Um CDC für die PostgreSQL-Datenbank zu aktivieren, sind folgende Änderungen erforderlich.

    • „wal_level“ muss in LOGICAL geändert werden. Dieser Wert kann im Abschnitt „Serverparameter“ im Azure-Portal geändert werden.

      Screenshot von enable-cdc-on-postgres-database.

    • Benutzer*innen, die auf die Tabelle zugreifen, muss die Replikationsrolle hinzugefügt worden sein.

      ALTER USER <username> WITH REPLICATION;

  • Laden Sie zum Erstellen der Flink PostgreSQL CDC-Tabelle alle abhängigen JAR-Dateien herunter. Verwenden Sie die Datei pom.xml mit folgendem Inhalt:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dep.download</groupId>
        <artifactId>dep-download</artifactId>
        <version>1.0-SNAPSHOT</version>
            <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
        <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        </dependencies>
    </project>
    
  • Verwenden Sie den Maven-Befehl, um alle abhängigen JAR-Dateien herunterzuladen.

       mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
    

    Hinweis

  • Starten Sie nach dem Herunterladen der abhängigen JAR-Dateien den Flink SQL-Client, und importieren Sie die JAR-Dateien in die Sitzung. Vollständiger Befehl:

    /opt/flink-webssh/bin/sql-client.sh -j
    /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j
    /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j
    /opt/flink-webssh/target/hamcrest-2.1.jar -j
    /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j
    /opt/flink-webssh/target/awaitility-4.0.1.jar -j
    /opt/flink-webssh/target/jsr308-all-1.1.2.jar
    

    Diese Befehle starten den SQL-Client mit den Abhängigkeiten:

    user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar 
    
                                      ????????
                                  ????????????????
                               ???????        ??????? ?
                             ????   ?????????      ?????
                             ???         ???????    ?????
                               ???            ???   ?????
                                 ??       ???????????????
                               ?? ?   ???      ?????? ?????
                               ?????   ????     ????? ?????
                            ???????       ???   ??????? ???
                      ????????? ??         ??   ??????????
                     ????????  ??          ?   ?? ???????
                   ????  ???           ?  ?? ???????? ?????
                  ???? ? ??          ? ?? ????????    ???? ??
                 ???? ????          ??????????       ??? ?? ????
              ???? ?? ???       ???????????         ???? ? ?  ???
              ??? ?? ??? ?????????             ????           ???
              ??   ? ???????             ????????          ??? ??
              ???    ???   ????????????????????           ????  ?
             ????? ???   ??????  ????????                 ????  ??
             ????????  ???????????????                            ??
             ?? ????   ??????? ???       ??????    ??         ???
             ??? ???  ??? ???????            ????   ?????????????
              ??? ?????  ???? ??                ??      ????  ???
              ??  ???   ?     ??                ??              ??
               ??  ??         ??                 ??        ????????
                ?? ?????       ??                  ???????????    ??
                 ??   ????     ?                    ???????      ??
                  ???   ?????                         ?? ???????????
                   ????    ????                     ??????? ????????
                     ?????                          ??  ???? ?????
                        ????????????????????????????????? ?????
    
       ______ _ _       _      _____  ____  _        _____ _ _            _  BETA   
      | ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | | 
      | |__ | |_ _ __ | | __ | (___ | |  | | |      | |    | |_ ___ _ __ | |_ 
      |  __| | | | '_ \| |/ /  \___ \| |  | | |     | |    | | |/ _ \ '_ \| __|
      | |   | | | | | |   <   ____) | |__| | |____  | |____| | | __/ | | | |_ 
      |_|   |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
    
           Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/xcao/.flink-sql-history
    
    Flink SQL>
    
  • Erstellen einer Flink PostgreSQL CDC-Tabelle per CDC-Connector

    CREATE TABLE shipments (
      shipment_id INT,
      order_id INT,
      origin STRING,
      destination STRING,
      is_arrived BOOLEAN,
      PRIMARY KEY (shipment_id) NOT ENFORCED
    ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'flinkpostgres.postgres.database.azure.com',
      'port' = '5432',
      'username' = 'username',
      'password' = 'password',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'shipments',
      'decoding.plugin.name' = 'pgoutput',
      'slot.name' = 'flink'
    );
    

Überprüfen

  • Führen Sie den Befehl „select *“ aus, um die Änderungen zu überwachen.

    select * from shipments;

    Screenshot von run-select-command.

Verweis