Ändern der Datenerfassung (CDC) der PostgreSQL-Tabelle mit Apache Flink®
Wichtig
Azure HDInsight auf AKS wurde am 31. Januar 2025 eingestellt. Erfahren Sie mehr über in dieser Ankündigung.
Sie müssen Ihre Workloads zu Microsoft Fabric oder ein gleichwertiges Azure-Produkt migrieren, um eine abrupte Beendigung Ihrer Workloads zu vermeiden.
Wichtig
Dieses Feature befindet sich derzeit in der Vorschau. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure Previews weitere rechtliche Bestimmungen enthalten, die für Azure-Features gelten, die in der Betaversion, in der Vorschau oder auf andere Weise noch nicht in die allgemeine Verfügbarkeit veröffentlicht werden. Informationen zu dieser spezifischen Vorschau finden Sie unter Vorschauinformationen zu Azure HDInsight auf AKS. Für Fragen oder Funktionsvorschläge senden Sie bitte eine Anfrage an AskHDInsight mit den Details und folgen Sie uns, um weitere Updates von Azure HDInsight Communityzu erhalten.
Change Data Capture (CDC) ist ein Verfahren, mit dem Sie Änderungen auf Zeilenebene in Datenbanktabellen als Reaktion auf Erstellungs-, Aktualisierungs- und Löschvorgänge nachverfolgen können. In diesem Artikel verwenden wir CDC Connectors für Apache Flink®, die eine Reihe von Quell-Connectors für Apache Flink bereitstellen. Die Konnektoren integrieren Debezium® als Engine, um die Datenänderungen zu erfassen.
Flink unterstützt die Interpretation von Debezium JSON- und Avro-Nachrichten als INSERT/UPDATE/DELETE-Nachrichten in Apache Flink SQL-System.
Diese Unterstützung ist in vielen Fällen nützlich, um:
- Synchronisieren inkrementeller Daten aus Datenbanken mit anderen Systemen
- Überwachungsprotokolle
- Erstellen von materialisierten Echtzeitansichten auf Datenbanken
- Anzeigen des zeitlichen Verknüpfungswechselverlaufs einer Datenbanktabelle
Als Nächstes erfahren wir, wie Änderungen an der PostgreSQL-Tabelle mithilfe Flink-SQL CDC überwacht werden. Der PostgreSQL CDC-Connector ermöglicht das Lesen von Snapshotdaten und inkrementellen Daten aus der PostgreSQL-Datenbank.
Voraussetzungen
- Azure PostgreSQL Flexible Server Version 14.7
- Apache Flink Cluster auf HDInsight und AKS
- Virtueller Linux-Computer zur Verwendung des PostgreSQL-Clients
- Fügen Sie die NSG-Regel hinzu, die eingehende und ausgehende Verbindungen am Port 5432 in HDInsight im AKS-Pool-Subnetz zulässt.
Vorbereiten der PostgreSQL-Tabelle & Client
Installieren Sie mit einem virtuellen Linux-Computer den PostgreSQL-Client mit den folgenden Befehlen.
sudo apt-get update sudo apt-get install postgresql-client
Installieren des Zertifikats zum Herstellen einer Verbindung mit dem PostgreSQL-Server mit SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Herstellen einer Verbindung mit dem Server (Host, Benutzername und Datenbankname entsprechend ersetzen)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
Nachdem eine Verbindung mit der Datenbank erfolgreich hergestellt wurde, erstellen Sie eine Beispieltabelle.
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Um CDC in der PostgreSQL-Datenbank zu aktivieren, müssen Sie die folgenden Änderungen vornehmen.
Apache Flink PostgreSQL CDC-Tabelle erstellen
Um eine Flink PostgreSQL CDC-Tabelle zu erstellen, laden Sie alle erforderlichen JAR-Dateien herunter. Verwenden Sie die
pom.xml
Datei mit dem folgenden Inhalt.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Verwenden Sie den Maven-Befehl, um alle abhängigen JAR-Dateien herunterzuladen.
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Anmerkung
- Wenn Ihr Web ssh-Pod keine Maven enthält, folgen Sie den Links, um sie herunterzuladen und zu installieren.
- Um jsr jar-Datei herunterzuladen, verwenden Sie den folgenden Befehl.
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
Sobald die abhängigen JARs heruntergeladen sind, starten Sie den Flink SQL-Client, wobei diese JARs in die Sitzung importiert werden sollen. Der vollständige Befehl lautet wie folgt:
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Diese Befehle starten den SQL-Client mit den Abhängigkeiten wie:
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Erstellen einer Flink PostgreSQL CDC-Tabelle mit CDC-Verbinder
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Validierung
Referenz
- Apache Flink Website
- PostgreSQL CDC Connector ist unter der Apache 2.0 License lizenziert.
- Apache, Apache Flink, Flink und zugehörige Open Source-Projektnamen sind Marken der Apache Software Foundation (ASF).