Ändra datainsamling (CDC) i PostgreSQL-tabellen med Apache Flink®
Viktig
Azure HDInsight på AKS drogs tillbaka den 31 januari 2025. Läs mer genom det här meddelandet.
Du måste migrera dina arbetsbelastningar till Microsoft Fabric- eller en motsvarande Azure-produkt för att undvika plötsliga uppsägningar av dina arbetsbelastningar.
Viktig
Den här funktionen är för närvarande i förhandsversion. De kompletterande användningsvillkoren för Förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller på annat sätt ännu inte har släppts i allmän tillgänglighet. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS förhandsversionsinformation. För frågor eller funktionsförslag, vänligen skicka en begäran på AskHDInsight med detaljerna och följ oss för fler uppdateringar om Azure HDInsight Community.
Change Data Capture (CDC) är en teknik som du kan använda för att spåra ändringar på radnivå i databastabeller som svar på åtgärder för att skapa, uppdatera och ta bort. I den här artikeln använder vi CDC-kontaktpunkter för Apache Flink®, som erbjuder en uppsättning källkontaktpunkter för Apache Flink. Kopplingarna integrerar Debezium® som motor för att fånga upp dataändringarna.
Flink har stöd för att tolka Debezium JSON- och Avro-meddelanden som INSERT/UPDATE/DELETE-meddelanden i Apache Flink SQL-systemet.
Det här stödet är användbart i många fall för att:
- Synkronisera inkrementella data från databaser till andra system
- Granskningsloggar
- Skapa materialiserade vyer i realtid på databaser
- Visa ändringshistorik för temporal koppling för en databastabell
Nu ska vi lära oss hur du övervakar ändringar i PostgreSQL-tabellen med hjälp av Flink-SQL CDC. Med PostgreSQL CDC-anslutningsappen kan du läsa ögonblicksbildsdata och inkrementella data från PostgreSQL-databasen.
Förutsättningar
- Azure PostgreSQL flexible server Version 14.7
- Apache Flink-kluster i HDInsight på AKS
- Virtuell Linux-dator för att använda PostgreSQL-klienten
- Lägg till NSG-regeln som tillåter inkommande och utgående anslutningar på port 5432 i HDInsight i AKS-poolens undernät.
Förbered PostgreSQL-tabellen & Klient
Installera PostgreSQL-klienten med hjälp av kommandona nedan med hjälp av en virtuell Linux-dator
sudo apt-get update sudo apt-get install postgresql-client
Installera certifikatet för att ansluta till PostgreSQL-servern med SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Anslut till servern (ersätt värd, användarnamn och databasnamn i enlighet med detta)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
När du har anslutit till databasen skapar du en exempeltabell
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Om du vill aktivera CDC på PostgreSQL-databasen måste du göra följande ändringar.
Skapa Apache Flink PostgreSQL CDC-tabell
Om du vill skapa tabellen Flink PostgreSQL CDC laddar du ned alla beroende jar-filer. Använd filen
pom.xml
med följande innehåll.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Använd maven-kommandot för att ladda ned alla beroende jar-filer
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Not
- Om din SSH-webbpod inte innehåller Maven, följ länkarna för att ladda ner och installera det.
- För att ladda ned jsr jar-filen använder du följande kommando
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
När de beroende jar-filerna har laddats ned startar Flink SQL-klienten, med dessa jar-filer som ska importeras till sessionen. Slutför kommandot enligt följande.
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Dessa kommandon startar SQL-klienten med beroendena som,
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Skapa en Flink PostgreSQL CDC-tabell med hjälp av CDC-anslutningsappen
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Validering
Hänvisning
- Apache Flink-webbplats
- PostgreSQL CDC Connector är licensierad under Apache 2.0-licens
- Apache, Apache Flink, Flink och associerade projektnamn med öppen källkod är varumärken av Apache Software Foundation (ASF).