使用 Apache Flink® 的 PostgreSQL 數據表異動數據擷取 (CDC)
重要
AKS 上的 Azure HDInsight 於 2025 年 1 月 31 日淘汰。 透過此公告 深入瞭解。
您必須將工作負載移轉至 Microsoft Fabric 或對等 Azure 產品,以避免突然終止工作負載。
重要
這項功能目前為預覽狀態。 Microsoft Azure 預覽版的補充使用規定 包含適用於 Beta 版、預覽版或尚未正式發行之 Azure 功能的更合法條款。 如需此特定預覽的相關資訊,請參閱 AKS 預覽資訊上的 Azure HDInsight。 如有問題或功能建議,請在 AskHDInsight 提交請求,並關注我們以獲取 Azure HDInsight 社群更多更新。
異動數據擷取 (CDC) 是一種技術,可用來追蹤資料庫數據表中的數據列層級變更,以回應建立、更新和刪除作業。 在本文中,我們會針對 Apache Flink 使用CDC 連接器,此連接器提供一組適用於 Apache Flink® 的來源連接器。 連接器將 Debezium® 整合成引擎以捕捉數據變更。
Flink 支援將 Debezium JSON 和 Avro 訊息解譯為 INSERT/UPDATE/DELETE 訊息至 Apache Flink SQL 系統。
在許多情況下,這項支援很有用:
- 將累加數據從資料庫同步處理至其他系統
- 稽核記錄
- 在資料庫上建置即時具體化檢視
- 檢視資料表的時間聯結變更歷史
現在,讓我們瞭解如何使用 Flink-SQL CDC 監視 PostgreSQL 數據表上的變更。 PostgreSQL CDC 連接器可讓您從 PostgreSQL 資料庫讀取快照集數據和增量數據。
先決條件
- Azure PostgresSQL 彈性伺服器 14.7 版
- HDInsight 上基於 AKS 的 Apache Flink 叢集
- Linux 虛擬機使用 PostgreSQL 的用戶端
- 新增 NSG 規則,以允許 AKS 集區子網上的 HDInsight 連接埠 5432 上的輸入和輸出連線。
準備 PostgreSQL 資料表 & 用戶端
使用 Linux 虛擬機,使用下列命令安裝 PostgreSQL 用戶端
sudo apt-get update sudo apt-get install postgresql-client
安裝憑證以使用 SSL 連線到 PostgreSQL 伺服器
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
連線到伺服器(據以取代主機、使用者名稱與資料庫名稱)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
成功連線到資料庫之後,請建立範例數據表
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
若要在 PostgreSQL 資料庫上啟用 CDC,您必須進行下列變更。
建立 Apache Flink PostgreSQL CDC 數據表
若要建立 Flink PostgreSQL CDC 數據表,請下載所有相依 jar。 使用具有下列內容的
pom.xml
檔案。<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
使用 maven 命令下載所有相依的 JAR 檔案
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
注意
- 如果您的 Web ssh Pod 不包含 maven,請遵循連結來下載並安裝。
- 若要下載 jsr jar 檔案,請使用下列命令
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
下載相依 jar 之後,啟動 Flink SQL 用戶端,並將這些 jar 匯入會話。 請按以下指示完成命令:
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
這些命令會連同相依性啟動 sql 用戶端,
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
使用 CDC 連接器建立 Flink PostgreSQL CDC 數據表
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
驗證
參考
- Apache Flink 網站
- PostgreSQL CDC 連接器 是在 Apache 2.0 授權下 授權
- Apache、Apache Flink、Flink 和相關聯的開放原始碼專案名稱是 Apache Software Foundation(ASF)的商標。