Ändra datainsamling (CDC) i PostgreSQL-tabellen med Apache Flink®
Kommentar
Vi drar tillbaka Azure HDInsight på AKS den 31 januari 2025. Före den 31 januari 2025 måste du migrera dina arbetsbelastningar till Microsoft Fabric eller en motsvarande Azure-produkt för att undvika plötsliga uppsägningar av dina arbetsbelastningar. Återstående kluster i din prenumeration stoppas och tas bort från värden.
Endast grundläggande stöd kommer att vara tillgängligt fram till datumet för pensionering.
Viktigt!
Den här funktionen finns i förhandsgranskning. De kompletterande användningsvillkoren för Förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller på annat sätt ännu inte har släppts i allmän tillgänglighet. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS-förhandsversionsinformation. Om du vill ha frågor eller funktionsförslag skickar du en begäran på AskHDInsight med informationen och följer oss för fler uppdateringar i Azure HDInsight Community.
Change Data Capture (CDC) är en teknik som du kan använda för att spåra ändringar på radnivå i databastabeller som svar på åtgärder för att skapa, uppdatera och ta bort. I den här artikeln använder vi CDC Connectors för Apache Flink®, som erbjuder en uppsättning anslutningsappar för Apache Flink. Anslutningsapparna integrerar Debezium® som motor för att samla in dataändringarna.
Flink har stöd för att tolka Debezium JSON- och Avro-meddelanden som INSERT/UPDATE/DELETE-meddelanden i Apache Flink SQL-systemet.
Det här stödet är användbart i många fall för att:
- Synkronisera inkrementella data från databaser till andra system
- Granskningsloggar
- Skapa materialiserade vyer i realtid på databaser
- Visa ändringshistorik för temporal koppling för en databastabell
Nu ska vi lära oss hur du övervakar ändringar i PostgreSQL-tabellen med Flink-SQL CDC. Med PostgreSQL CDC-anslutningsappen kan du läsa ögonblicksbildsdata och inkrementella data från PostgreSQL-databasen.
Förutsättningar
- Azure PostgresSQL – flexibel server version 14.7
- Apache Flink-kluster i HDInsight på AKS
- Virtuell Linux-dator för att använda PostgreSQL-klienten
- Lägg till NSG-regeln som tillåter inkommande och utgående anslutningar på port 5432 i HDInsight i AKS-poolens undernät.
Förbereda PostgreSQL-tabell och klient
Installera PostgreSQL-klienten med hjälp av kommandona nedan med hjälp av en virtuell Linux-dator
sudo apt-get update sudo apt-get install postgresql-client
Installera certifikatet för att ansluta till PostgreSQL-servern med SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Anslut till servern (ersätt värd, användarnamn och databasnamn i enlighet med detta)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
När du har anslutit till databasen skapar du en exempeltabell
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Om du vill aktivera CDC på PostgreSQL-databasen måste du göra följande ändringar.
Skapa Apache Flink PostgreSQL CDC-tabell
Om du vill skapa tabellen Flink PostgreSQL CDC laddar du ned alla beroende jar-filer.
pom.xml
Använd filen med följande innehåll.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Använd maven-kommandot för att ladda ned alla beroende jar-filer
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Kommentar
- Om din webb-ssh-podd inte innehåller maven följer du länkarna för att ladda ned och installera den.
- För att ladda ned jsr jar-filen använder du följande kommando
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
När de beroende jar-filerna har laddats ned startar du Flink SQL-klienten med dessa jar-filer som ska importeras till sessionen. Slutför kommandot enligt följande.
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Dessa kommandon startar SQL-klienten med beroendena som,
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Skapa en Flink PostgreSQL CDC-tabell med hjälp av CDC-anslutningsappen
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Validering
Referens
- Apache Flink-webbplats
- PostgreSQL CDC Connector är licensierat under Apache 2.0-licens
- Apache, Apache Flink, Flink och associerade öppen källkod projektnamn är varumärken som tillhör Apache Software Foundation (ASF).