Captura de datos modificados (CDC) de tablas de PostgreSQL con Apache Flink®
Nota:
Retiraremos Azure HDInsight en AKS el 31 de enero de 2025. Antes del 31 de enero de 2025, deberá migrar las cargas de trabajo a Microsoft Fabric o un producto equivalente de Azure para evitar la terminación repentina de las cargas de trabajo. Los clústeres restantes de la suscripción se detendrán y quitarán del host.
Solo el soporte técnico básico estará disponible hasta la fecha de retirada.
Importante
Esta funcionalidad actualmente está en su versión preliminar. En Términos de uso complementarios para las versiones preliminares de Microsoft Azure encontrará más términos legales que se aplican a las características de Azure que están en versión beta, en versión preliminar, o que todavía no se han lanzado con disponibilidad general. Para más información sobre esta versión preliminar específica, consulte la Información de Azure HDInsight sobre la versión preliminar de AKS. Para plantear preguntas o sugerencias sobre la característica, envíe una solicitud en AskHDInsight con los detalles y síganos en la comunidad de Azure HDInsight para obtener más actualizaciones.
La captura de datos modificados (CDC) es una técnica que puede usar para realizar un seguimiento de los cambios de nivel de fila en las tablas de base de datos en respuesta a las operaciones de creación, actualización y eliminación. En este artículo, usamos conectores CDC para Apache Flink®, que ofrecen un conjunto de conectores de origen para Apache Flink. Los conectores integran Debezium® como el motor para capturar los cambios de datos.
Flink admite la interpretación de los mensajes JSON y Avro de Debezium como mensajes INSERT/UPDATE/DELETE en el sistema SQL de Apache Flink.
Esta compatibilidad es útil en muchos casos para:
- Sincronizar datos incrementales de bases de datos a otros sistemas
- Registros de auditoría
- Creación de vistas materializadas en tiempo real en bases de datos
- Visualización del historial de cambios de combinación temporal de una tabla de base de datos
Ahora, vamos a aprender a supervisar los cambios en la tabla PostgreSQL mediante CDC de Flink-SQL. El conector CDC de PostgreSQL permite leer datos de instantáneas y datos incrementales de la base de datos PostgreSQL.
Requisitos previos
- servidor flexible de Azure PostgresSQL Versión 14.7
- Clúster de Apache Flink en HDInsight en AKS
- Máquina virtual Linux para usar el cliente PostgreSQL
- Agregue la regla de NSG que permite conexiones entrantes y salientes en el puerto 5432 en HDInsight en la subred del grupo de AKS.
Preparar tabla y cliente de PostgreSQL
Con una máquina virtual Linux, instale el cliente PostgreSQL mediante los siguientes comandos
sudo apt-get update sudo apt-get install postgresql-client
Instalación del certificado para conectarse al servidor PostgreSQL mediante SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Conéctese al servidor (reemplace el host, el nombre de usuario y el nombre de la base de datos en consecuencia)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
Después de conectarse correctamente a la base de datos, cree una tabla de ejemplo
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Para habilitar CDC en la base de datos PostgreSQL, es necesario realizar los siguientes cambios.
Creación de una tabla CDC de PostgreSQL de Apache Flink
Para crear una tabla CDC de PostgreSQL de Flink, descargue todos los archivos JAR dependientes. Use el archivo
pom.xml
con el siguiente contenido.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Use el comando maven para descargar todos los archivos JAR dependientes
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Nota:
- Si el pod ssh web no contiene maven, siga los vínculos para descargarlo e instalarlo.
- Para descargar el archivo jar de jsr, use el siguiente comando
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
Una vez descargados los archivos JAR dependientes, inicie el cliente de SQL de Flink, con estos archivos jar que se importarán en la sesión. Complete el comando como se indica a continuación,
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Estos comandos inician el cliente sql con las dependencias como,
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Creación de una tabla CDC de PostgreSQL de Flink mediante el conector CDC
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Validation
Referencia
- Sitio web de Apache Flink
- Conector CDC de PostgreSQL tiene licencia en Licencia de Apache 2.0
- Apache, Apache Flink, Flink y los nombres de proyecto de código abierto asociados son marcas comerciales de Apache Software Foundation (ASF).