Captura de datos modificados (CDC) de la tabla postgreSQL mediante Apache Flink®
Importante
Azure HDInsight en AKS se retiró el 31 de enero de 2025. Obtenga más información con este anuncio.
Debe migrar las cargas de trabajo a microsoft Fabric o un producto equivalente de Azure para evitar la terminación repentina de las cargas de trabajo.
Importante
Esta característica está actualmente en versión preliminar. Los Términos de uso complementarios para las versiones preliminares de Microsoft Azure incluyen más términos legales que se aplican a las características de Azure que se encuentran en versión beta, en versión preliminar o, de lo contrario, aún no se han publicado en disponibilidad general. Para obtener información sobre esta versión preliminar específica, consulte información sobre la versión preliminar de Azure HDInsight en AKS. Para preguntas o sugerencias de características, envíe una solicitud en AskHDInsight con los detalles y síganos para obtener más actualizaciones sobre Comunidad de Azure HDInsight.
La captura de datos modificados (CDC) es una técnica que puede usar para realizar un seguimiento de los cambios de nivel de fila en las tablas de base de datos en respuesta a las operaciones de creación, actualización y eliminación. En este artículo, usamos conectores CDC para Apache Flink®, que ofrecen un conjunto de conectores de origen para Apache Flink. Los conectores integran Debezium® como motor para capturar los cambios de datos.
Flink admite la interpretación de los mensajes JSON y Avro de Debezium como mensajes INSERT/UPDATE/DELETE en el sistema SQL de Apache Flink.
Esta compatibilidad es útil en muchos casos para:
- Sincronizar datos incrementales de bases de datos a otros sistemas
- Registros de auditoría
- Creación de vistas materializadas en tiempo real en bases de datos
- Visualización del historial de cambios de combinación temporal de una tabla de base de datos
Ahora, vamos a aprender a supervisar los cambios en la tabla postgreSQL mediante Flink-SQL CDC. El conector CDC de PostgreSQL permite leer datos de instantáneas y datos incrementales de la base de datos postgreSQL.
Prerrequisitos
- servidor flexible de Azure PostgresSQL versión 14.7
- clúster de Apache Flink en HDInsight en AKS
- Máquina virtual Linux para usar el cliente postgreSQL
- Agregue la regla de NSG que permite conexiones entrantes y salientes en el puerto 5432 en HDInsight en la subred del grupo de AKS.
Preparación de la tabla de PostgreSQL & Client
Con una máquina virtual Linux, instale el cliente postgreSQL mediante los siguientes comandos.
sudo apt-get update sudo apt-get install postgresql-client
Instalación del certificado para conectarse al servidor postgreSQL mediante SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Conéctese al servidor (reemplace el host, el nombre de usuario y el nombre de la base de datos en consecuencia).
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
Después de conectarse correctamente a la base de datos, cree una tabla de ejemplo.
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Para habilitar CDC en la base de datos postgreSQL, es necesario realizar los siguientes cambios.
Creación de una tabla CDC de PostgreSQL de Apache Flink
Para crear una tabla CDC de PostgreSQL de Flink, descargue todos los archivos JAR dependientes. Use el archivo
pom.xml
con el siguiente contenido.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Use el comando maven para descargar todos los archivos JAR dependientes.
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Nota
- Si el pod ssh web no contiene maven, siga los vínculos para descargarlo e instalarlo.
- Para descargar el archivo jar de jsr, use el siguiente comando
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
Una vez descargados los archivos JAR dependientes, inicie el cliente de Flink SQL, con estos archivos JAR que se importarán en la sesión. Complete el comando como se indica a continuación,
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Estos comandos inician el cliente sql con las dependencias como:
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Creación de una tabla CDC de PostgreSQL de Flink mediante el conector CDC
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Validación
Referencia
- Sitio web de Apache Flink
- el conector CDC de PostgreSQL tiene licencia bajo la Licencia de Apache 2.0
- Los nombres de proyecto de código abierto asociados de Apache, Apache Flink, Apache Flink y son marcas comerciales de la Apache Software Foundation (ASF).