Capture de données modifiées (CDC) de la table PostgreSQL à l’aide d’Apache Flink®
Important
Azure HDInsight sur AKS a été mis hors service le 31 janvier 2025. Découvrez-en plus grâce à cette annonce.
Vous devez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent pour éviter l’arrêt brusque de vos charges de travail.
Important
Cette fonctionnalité est actuellement en préversion. Les Conditions d’utilisation supplémentaires pour les préversions Microsoft Azure incluent des termes juridiques supplémentaires qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou qui ne sont pas encore publiées en disponibilité générale. Pour des informations concernant cette préversion spécifique, consultez les informations sur la préversion d'Azure HDInsight sur AKS . Pour des questions ou des suggestions de fonctionnalités, envoyez une demande sur AskHDInsight avec les détails et suivez-nous pour plus de mises à jour sur Communauté Azure HDInsight.
Capture de données modifiées (CDC) est une technique que vous pouvez utiliser pour suivre les modifications au niveau des lignes dans les tables de base de données en réponse aux opérations de création, de mise à jour et de suppression. Dans cet article, nous utilisons connecteurs CDC pour Apache Flink®, qui offrent un ensemble de connecteurs de source pour Apache Flink®. Les connecteurs intègrent Debezium® comme moteur pour la captation des changements de données.
Flink prend en charge l’interprétation des messages JSON et Avro Debezium en tant que messages INSERT/UPDATE/DELETE dans le système SQL Apache Flink.
Cette prise en charge est utile dans de nombreux cas pour :
- Synchroniser des données incrémentielles à partir de bases de données vers d’autres systèmes
- Journaux d’audit
- Créer des vues matérialisées en temps réel sur des bases de données
- Afficher l’historique des modifications de jointure temporelle d’une table de base de données
À présent, nous allons apprendre à surveiller les modifications sur la table PostgreSQL à l’aide de Flink-SQL cdc. Le connecteur CDC PostgreSQL permet de lire des données d’instantané et des données incrémentielles à partir de la base de données PostgreSQL.
Conditions préalables
- serveur flexible Azure PostgresSQL version 14.7
- cluster Apache Flink sur HDInsight sur AKS
- Machine virtuelle Linux pour utiliser le client PostgreSQL
- Ajoutez la règle de groupe de sécurité réseau qui autorise les connexions entrantes et sortantes sur le port 5432 dans HDInsight sur le sous-réseau du pool AKS.
Préparer la table PostgreSQL & Client
À l’aide d’une machine virtuelle Linux, installez le client PostgreSQL à l’aide des commandes ci-dessous
sudo apt-get update sudo apt-get install postgresql-client
Installer le certificat pour se connecter au serveur PostgreSQL à l’aide de SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Se connecter au serveur (remplacer l’hôte, le nom d’utilisateur et le nom de la base de données en conséquence)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
Après la connexion à la base de données, créez un exemple de table
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Pour activer la capture de données modifiées (CDC) sur une base de données PostgreSQL, vous devez apporter les modifications suivantes.
Créer une table de CDC d'Apache Flink pour PostgreSQL
Pour créer une table CDC Flink PostgreSQL, téléchargez tous les fichiers jar dépendants. Utilisez le fichier
pom.xml
avec le contenu suivant.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Utiliser la commande maven pour télécharger tous les fichiers jar dépendants
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Remarque
- Si votre pod ssh web ne contient pas maven, suivez les liens pour le télécharger et l’installer.
- Pour télécharger le fichier jar jsr, utilisez la commande suivante
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
Une fois que les fichiers jar dépendants sont téléchargés, démarrez le client Flink SQL, avec ces fichiers jar à importer dans la session. Complétez la commande comme suit :
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Ces commandes démarrent le client sql avec les dépendances comme suit :
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Utiliser le connecteur CDC pour créer une table CDC Flink PostgreSQL
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Validation
Référence
- Site Web Apache Flink
- Connecteur CDC PostgreSQL est concédé sous licence sous Licence Apache 2.0
- Apache, Apache Flink, Flink et les noms de projets open source associés sont marques déposées de l'Apache Software Foundation (ASF).