Change Data Capture (CDC) della tabella PostgreSQL con Apache Flink®
Nota
Azure HDInsight su AKS verrà ritirato il 31 gennaio 2025. Prima del 31 gennaio 2025, sarà necessario eseguire la migrazione dei carichi di lavoro a Microsoft Fabric o a un prodotto Azure equivalente per evitare interruzioni improvvise dei carichi di lavoro. I cluster rimanenti nella sottoscrizione verranno arrestati e rimossi dall’host.
Solo il supporto di base sarà disponibile fino alla data di ritiro.
Importante
Questa funzionalità è attualmente disponibile solo in anteprima. Le Condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure includono termini legali aggiuntivi che si applicano a funzionalità di Azure in versione beta, in anteprima o in altro modo non ancora disponibili a livello generale. Per informazioni su questa anteprima specifica, vedere Informazioni sull'anteprima di Azure HDInsight nel servizio Azure Kubernetes. Per domande o suggerimenti sulle funzionalità, inviare una richiesta in AskHDInsight con i dettagli e seguire Microsoft per altri aggiornamenti nella Community di Azure HDInsight.
Change Data Capture (CDC) è una tecnica che consente di tenere traccia delle modifiche a livello di riga nelle tabelle di database in risposta a operazioni di creazione, aggiornamento ed eliminazione. In questo articolo vengono usati CDC Connectors for Apache Flink®, che offrono un set di connettori di origine per Apache Flink. I connettori integrano Debezium® come motore per acquisire le modifiche ai dati.
Flink supporta l'interpretazione dei messaggi JSON e Avro Debezium come messaggi INSERT/UPDATE/DELETE nel sistema SQL Apache Flink.
Questo supporto è utile in molti casi per:
- Sincronizzare i dati incrementali dai database ad altri sistemi
- Log di controllo
- Creare viste materializzate in tempo reale nei database
- Visualizzare la cronologia delle modifiche del join temporale di una tabella di database
Si apprenderà ora come monitorare le modifiche nella tabella PostgreSQL usando Flink-SQL CDC. Il connettore PostgreSQL CDC consente di leggere i dati degli snapshot e i dati incrementali dal database PostgreSQL.
Prerequisiti
- Server flessibile di Azure PostgresSQL versione 14.7
- Cluster Apache Flink in HDInsight su AKS
- Macchina virtuale Linux per usare il client PostgreSQL
- Aggiungere la regola del gruppo di sicurezza di rete che consente le connessioni in ingresso e in uscita sulla porta 5432 nella subnet del pool HDInsight su AKS.
Preparare la tabella PostgreSQL e il client
Usando una macchina virtuale Linux, installare il client PostgreSQL tramite i comandi seguenti
sudo apt-get update sudo apt-get install postgresql-client
Installare il certificato per connettersi al server PostgreSQL tramite SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Connettersi al server (sostituire host, nome utente e database di conseguenza)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
Dopo la connessione al database, creare una tabella di esempio
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Per abilitare CDC nel database PostgreSQL, è necessario apportare le modifiche seguenti.
Creare una tabella Apache Flink PostgreSQL CDC
Per creare la tabella Flink PostgreSQL CDC, scaricare tutti i file JAR dipendenti. Usare il file
pom.xml
con il contenuto seguente.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Usare il comando maven per scaricare tutti i file JAR dipendenti
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Nota
- Se il pod SSH Web non contiene maven, seguire i collegamenti per scaricarlo e installarlo.
- Per scaricare il file JAR JSR, usare il comando seguente
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
Dopo aver scaricato i file JAR dipendenti, avviare il client Flink SQL con questi file JAR da importare nella sessione. Completare il comando come indicato di seguito.
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Questi comandi avviano il client SQL con le dipendenze come,
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Creare una tabella Flink PostgreSQL CDC usando il connettore CDC
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Convalida
Riferimento
- Sito Web di Apache Flink
- PostgreSQL CDC Connector è concesso in licenza Apache 2.0
- Apache, Apache Flink, Flink e i nomi dei progetti open source associati sono marchi di Apache Software Foundation (ASF).