Change Data Capture (CDC) von PostgreSQL-Tabellen mit Apache Flink®
Hinweis
Azure HDInsight on AKS wird am 31. Januar 2025 eingestellt. Vor dem 31. Januar 2025 müssen Sie Ihre Workloads zu Microsoft Fabric oder einem gleichwertigen Azure-Produkt migrieren, um eine abruptes Beendigung Ihrer Workloads zu vermeiden. Die verbleibenden Cluster in Ihrem Abonnement werden beendet und vom Host entfernt.
Bis zum Einstellungsdatum ist nur grundlegende Unterstützung verfügbar.
Wichtig
Diese Funktion steht derzeit als Vorschau zur Verfügung. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure-Vorschauen enthalten weitere rechtliche Bestimmungen, die für Azure-Features in Betaversionen, in Vorschauversionen oder anderen Versionen gelten, die noch nicht allgemein verfügbar gemacht wurden. Informationen zu dieser spezifischen Vorschau finden Sie unter Informationen zur Vorschau von Azure HDInsight on AKS. Bei Fragen oder Funktionsvorschlägen senden Sie eine Anfrage an AskHDInsight mit den entsprechenden Details, und folgen Sie uns für weitere Updates in der Azure HDInsight-Community.
Change Data Capture (CDC) ist eine Methode, mit der Sie als Reaktion auf Erstellungs-, Aktualisierungs- und Löschvorgänge vorgenommene Änderungen auf Zeilenebene in Datenbanktabellen nachverfolgen können. In diesem Artikel verwenden wir CDC-Connectors für Apache Flink®, die eine Reihe von Quellconnectors für Apache Flink bieten. Die Connectors integrieren Debezium® als Modul zum Aufzeichnen der Datenänderungen.
Flink unterstützt die Interpretation von Debezium-JSON- und -Avro-Nachrichten als INSERT-/UPDATE-/DELETE-Nachrichten an das Apache Flink SQL-System.
Diese Unterstützung ist in vielen Fällen nützlich, da sie Folgendes ermöglicht:
- Synchronisieren inkrementeller Daten aus Datenbanken mit anderen Systemen
- Überwachungsprotokolle
- Erstellen materialisierter Echtzeitsichten für Datenbanken
- Anzeigen des temporalen Verlaufs der Verknüpfungsänderungen einer Datenbanktabelle
Als Nächstes erfahren Sie, wie Sie Änderungen an einer PostgreSQL-Tabelle mithilfe von Flink SQL CDC überwachen. Der PostgreSQL-CDC-Connector ermöglicht das Lesen von Momentaufnahmedaten und inkrementellen Daten aus einer PostgreSQL-Datenbank.
Voraussetzungen
- Flexibler Azure PostgresSQL-Server, Version 14.7
- Apache Flink-Cluster in HDInsight auf AKS
- Virtueller Linux-Computer zur Verwendung des PostgreSQL-Clients
- Fügen Sie die NSG-Regel hinzu, die eingehende und ausgehende Verbindungen am Port 5432 in HDInsight im AKS-Poolsubnetz zulässt.
Vorbereiten von PostgreSQL-Tabelle und -Client
Verwenden Sie einen virtuellen Linux-Computer, und installieren Sie den PostgreSQL-Client mithilfe folgender Befehle:
sudo apt-get update sudo apt-get install postgresql-client
Installieren Sie das Zertifikat zum Herstellen einer SSL-Verbindung mit dem PostgreSQL-Server.
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Stellen Sie eine Verbindung mit dem Server her. (Ersetzen Sie dabei Host, Benutzername und Datenbankname entsprechend.)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
Erstellen Sie nach erfolgreicher Verbindungsherstellung mit der Datenbank eine Beispieltabelle.
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Um CDC für die PostgreSQL-Datenbank zu aktivieren, sind folgende Änderungen erforderlich.
Erstellen der Apache Flink PostgreSQL CDC-Tabelle
Laden Sie zum Erstellen der Flink PostgreSQL CDC-Tabelle alle abhängigen JAR-Dateien herunter. Verwenden Sie die Datei
pom.xml
mit folgendem Inhalt:<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Verwenden Sie den Maven-Befehl, um alle abhängigen JAR-Dateien herunterzuladen.
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Hinweis
- Wenn Ihr Web-SSH-Pod kein Maven enthält, folgen Sie den Links, um es herunterzuladen und zu installieren.
- Verwenden Sie zum Herunterladen der jsr-JAR-Datei den folgenden Befehl:
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
Starten Sie nach dem Herunterladen der abhängigen JAR-Dateien den Flink SQL-Client, und importieren Sie die JAR-Dateien in die Sitzung. Vollständiger Befehl:
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Diese Befehle starten den SQL-Client mit den Abhängigkeiten:
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Erstellen einer Flink PostgreSQL CDC-Tabelle per CDC-Connector
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Überprüfen
Verweis
- Apache Flink Website
- Der PostgreSQL-CDC-Connector wird unter der Apache 2.0-Lizenz lizenziert.
- Apache, Apache Flink, Flink und zugehörige Open Source-Projektnamen sind Handelsmarken der Apache Software Foundation (ASF).