使用 Apache Flink® 对 PostgreSQL 表进行数据变更捕获(CDC)
重要
AKS 上的 Azure HDInsight 已于 2025 年 1 月 31 日停用。 了解此公告的详细信息。
需要将工作负荷迁移到 Microsoft Fabric 或等效的 Azure 产品,以避免工作负荷突然终止。
重要
此功能目前以预览版提供。 Microsoft Azure 预览版补充使用条款 包括适用于尚处于 beta 版、预览版或尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览版的信息,请参阅 Azure HDInsight 在 AKS 上的预览版信息。 有关问题或功能建议,请在 AskHDInsight 上提交请求并提供详细信息,关注我们以获取 Azure HDInsight 社区的更多更新。
变更数据捕获(CDC)是一种技术,可以用于记录数据库表中因创建、更新和删除操作而引起的行级更改。 在本文中,我们使用适用于 Apache Flink 的CDC 连接器,该连接器为 Apache Flink® 提供了一组源连接器。 连接器将 Debezium® 集成为引擎来捕获数据更改。
Flink 支持将 Debezium JSON 和 Avro 消息转换为 INSERT/UPDATE/DELETE 消息,并将其注入到 Apache Flink SQL 系统中。
在许多情况下,此支持非常有用:
- 将增量数据从数据库同步到其他系统
- 审核日志
- 在数据库上生成实时具体化视图
- 查看数据库表中时态联接的更改历史记录
现在,让我们了解如何使用 Flink-SQL CDC 监视 PostgreSQL 表上的更改。 PostgreSQL CDC 连接器允许从 PostgreSQL 数据库读取快照数据和增量数据。
先决条件
- Azure PostgresSQL 灵活服务器版本 14.7
- AKS 上的 HDInsight 上的 Apache Flink 群集
- 使用 PostgreSQL 客户端的 Linux 虚拟机
- 添加允许 HDInsight 在 AKS 池子子网中端口 5432 上进行入站和出站连接的 NSG 规则。
& 客户端准备 PostgreSQL 表
使用 Linux 虚拟机,使用以下命令安装 PostgreSQL 客户端
sudo apt-get update sudo apt-get install postgresql-client
安装证书以使用 SSL 连接到 PostgreSQL 服务器
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
连接到服务器(相应地替换主机、用户名和数据库名称)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
成功连接到数据库后,创建示例表
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
若要在 PostgreSQL 数据库上启用 CDC,需要进行以下更改。
创建 Apache Flink PostgreSQL CDC 表
若要创建 Flink PostgreSQL CDC 表,请下载所有依赖 jar。 将
pom.xml
文件与以下内容一起使用。<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
使用 maven 命令下载所有依赖 jar
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
注意
- 如果 Web ssh Pod 不包含 maven,请按照链接下载并安装它。
- 若要下载 jsr jar 文件,请使用以下命令
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
下载从属 jar 后,启动 Flink SQL 客户端,并将这些 jar 导入到会话中。 完成命令,如下所示:
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
这些命令使用依赖项启动 sql 客户端,如下所示:
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
使用 CDC 连接器创建 Flink PostgreSQL CDC 表
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
验证
参考
- Apache Flink 网站
- PostgreSQL CDC 连接器 根据 Apache 2.0 许可证 获得许可
- Apache、Apache Flink、Flink 和关联的开源项目名称 Apache Software Foundation(ASF)的 商标。