Freigeben über


Ändern der Datenerfassung (CDC) der PostgreSQL-Tabelle mit Apache Flink®

Wichtig

Azure HDInsight auf AKS wurde am 31. Januar 2025 eingestellt. Erfahren Sie mehr über in dieser Ankündigung.

Sie müssen Ihre Workloads zu Microsoft Fabric oder ein gleichwertiges Azure-Produkt migrieren, um eine abrupte Beendigung Ihrer Workloads zu vermeiden.

Wichtig

Dieses Feature befindet sich derzeit in der Vorschau. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure Previews weitere rechtliche Bestimmungen enthalten, die für Azure-Features gelten, die in der Betaversion, in der Vorschau oder auf andere Weise noch nicht in die allgemeine Verfügbarkeit veröffentlicht werden. Informationen zu dieser spezifischen Vorschau finden Sie unter Vorschauinformationen zu Azure HDInsight auf AKS. Für Fragen oder Funktionsvorschläge senden Sie bitte eine Anfrage an AskHDInsight mit den Details und folgen Sie uns, um weitere Updates von Azure HDInsight Communityzu erhalten.

Change Data Capture (CDC) ist ein Verfahren, mit dem Sie Änderungen auf Zeilenebene in Datenbanktabellen als Reaktion auf Erstellungs-, Aktualisierungs- und Löschvorgänge nachverfolgen können. In diesem Artikel verwenden wir CDC Connectors für Apache Flink®, die eine Reihe von Quell-Connectors für Apache Flink bereitstellen. Die Konnektoren integrieren Debezium® als Engine, um die Datenänderungen zu erfassen.

Flink unterstützt die Interpretation von Debezium JSON- und Avro-Nachrichten als INSERT/UPDATE/DELETE-Nachrichten in Apache Flink SQL-System.

Diese Unterstützung ist in vielen Fällen nützlich, um:

  • Synchronisieren inkrementeller Daten aus Datenbanken mit anderen Systemen
  • Überwachungsprotokolle
  • Erstellen von materialisierten Echtzeitansichten auf Datenbanken
  • Anzeigen des zeitlichen Verknüpfungswechselverlaufs einer Datenbanktabelle

Als Nächstes erfahren wir, wie Änderungen an der PostgreSQL-Tabelle mithilfe Flink-SQL CDC überwacht werden. Der PostgreSQL CDC-Connector ermöglicht das Lesen von Snapshotdaten und inkrementellen Daten aus der PostgreSQL-Datenbank.

Voraussetzungen

Vorbereiten der PostgreSQL-Tabelle & Client

  • Installieren Sie mit einem virtuellen Linux-Computer den PostgreSQL-Client mit den folgenden Befehlen.

    sudo apt-get update
    sudo apt-get install postgresql-client
    
  • Installieren des Zertifikats zum Herstellen einer Verbindung mit dem PostgreSQL-Server mit SSL

    wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem

  • Herstellen einer Verbindung mit dem Server (Host, Benutzername und Datenbankname entsprechend ersetzen)

    psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
    
  • Nachdem eine Verbindung mit der Datenbank erfolgreich hergestellt wurde, erstellen Sie eine Beispieltabelle.

    CREATE TABLE shipments (
        shipment_id SERIAL NOT NULL PRIMARY KEY,
        order_id SERIAL NOT NULL,
        origin VARCHAR(255) NOT NULL,
        destination VARCHAR(255) NOT NULL,
        is_arrived BOOLEAN NOT NULL
      );
      ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
      ALTER TABLE public.shipments REPLICA IDENTITY FULL;
      INSERT INTO shipments
      VALUES (default,10001,'Beijing','Shanghai',false),
         (default,10002,'Hangzhou','Shanghai',false),
         (default,10003,'Shanghai','Hangzhou',false);
    
  • Um CDC in der PostgreSQL-Datenbank zu aktivieren, müssen Sie die folgenden Änderungen vornehmen.

    • Die WAL-Ebene muss auf logischgeändert werden. Dieser Wert kann im Abschnitt "Serverparameter" im Azure-Portal geändert werden.

      Screenshot, der zeigt, wie Sie die Datenbank

    • Der Benutzer, der auf die Tabelle zugreift, muss die Rolle "REPLIKATION" hinzugefügt haben.

      ALTER USER <username> WITH REPLICATION;

  • Um eine Flink PostgreSQL CDC-Tabelle zu erstellen, laden Sie alle erforderlichen JAR-Dateien herunter. Verwenden Sie die pom.xml Datei mit dem folgenden Inhalt.

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dep.download</groupId>
        <artifactId>dep-download</artifactId>
        <version>1.0-SNAPSHOT</version>
            <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
        <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        </dependencies>
    </project>
    
  • Verwenden Sie den Maven-Befehl, um alle abhängigen JAR-Dateien herunterzuladen.

       mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
    

    Anmerkung

  • Sobald die abhängigen JARs heruntergeladen sind, starten Sie den Flink SQL-Client, wobei diese JARs in die Sitzung importiert werden sollen. Der vollständige Befehl lautet wie folgt:

    /opt/flink-webssh/bin/sql-client.sh -j
    /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j
    /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j
    /opt/flink-webssh/target/hamcrest-2.1.jar -j
    /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j
    /opt/flink-webssh/target/awaitility-4.0.1.jar -j
    /opt/flink-webssh/target/jsr308-all-1.1.2.jar
    

    Diese Befehle starten den SQL-Client mit den Abhängigkeiten wie:

    user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar 
    
                                      ????????
                                  ????????????????
                               ???????        ??????? ?
                             ????   ?????????      ?????
                             ???         ???????    ?????
                               ???            ???   ?????
                                 ??       ???????????????
                               ?? ?   ???      ?????? ?????
                               ?????   ????     ????? ?????
                            ???????       ???   ??????? ???
                      ????????? ??         ??   ??????????
                     ????????  ??          ?   ?? ???????
                   ????  ???           ?  ?? ???????? ?????
                  ???? ? ??          ? ?? ????????    ???? ??
                 ???? ????          ??????????       ??? ?? ????
              ???? ?? ???       ???????????         ???? ? ?  ???
              ??? ?? ??? ?????????             ????           ???
              ??   ? ???????             ????????          ??? ??
              ???    ???   ????????????????????           ????  ?
             ????? ???   ??????  ????????                 ????  ??
             ????????  ???????????????                            ??
             ?? ????   ??????? ???       ??????    ??         ???
             ??? ???  ??? ???????            ????   ?????????????
              ??? ?????  ???? ??                ??      ????  ???
              ??  ???   ?     ??                ??              ??
               ??  ??         ??                 ??        ????????
                ?? ?????       ??                  ???????????    ??
                 ??   ????     ?                    ???????      ??
                  ???   ?????                         ?? ???????????
                   ????    ????                     ??????? ????????
                     ?????                          ??  ???? ?????
                        ????????????????????????????????? ?????
    
       ______ _ _       _      _____  ____  _        _____ _ _            _  BETA   
      | ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | | 
      | |__ | |_ _ __ | | __ | (___ | |  | | |      | |    | |_ ___ _ __ | |_ 
      |  __| | | | '_ \| |/ /  \___ \| |  | | |     | |    | | |/ _ \ '_ \| __|
      | |   | | | | | |   <   ____) | |__| | |____  | |____| | | __/ | | | |_ 
      |_|   |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
    
           Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/xcao/.flink-sql-history
    
    Flink SQL>
    
  • Erstellen einer Flink PostgreSQL CDC-Tabelle mit CDC-Verbinder

    CREATE TABLE shipments (
      shipment_id INT,
      order_id INT,
      origin STRING,
      destination STRING,
      is_arrived BOOLEAN,
      PRIMARY KEY (shipment_id) NOT ENFORCED
    ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'flinkpostgres.postgres.database.azure.com',
      'port' = '5432',
      'username' = 'username',
      'password' = 'password',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'shipments',
      'decoding.plugin.name' = 'pgoutput',
      'slot.name' = 'flink'
    );
    

Validierung

  • Führen Sie den Befehl 'select *' aus, um die Änderungen zu überwachen.

    select * from shipments;

    Screenshot, der zeigt, wie der Befehl 'Befehl auswählen und ausführen' verwendet wird.

Referenz