共用方式為


使用 Apache Flink® 的 PostgreSQL 數據表異動數據擷取 (CDC)

重要

AKS 上的 Azure HDInsight 於 2025 年 1 月 31 日淘汰。 透過此公告 深入瞭解

您必須將工作負載移轉至 Microsoft Fabric 或對等 Azure 產品,以避免突然終止工作負載。

重要

這項功能目前為預覽狀態。 Microsoft Azure 預覽版的補充使用規定 包含適用於 Beta 版、預覽版或尚未正式發行之 Azure 功能的更合法條款。 如需此特定預覽的相關資訊,請參閱 AKS 預覽資訊上的 Azure HDInsight。 如有問題或功能建議,請在 AskHDInsight 提交請求,並關注我們以獲取 Azure HDInsight 社群更多更新。

異動數據擷取 (CDC) 是一種技術,可用來追蹤資料庫數據表中的數據列層級變更,以回應建立、更新和刪除作業。 在本文中,我們會針對 Apache Flink 使用CDC 連接器,此連接器提供一組適用於 Apache Flink® 的來源連接器。 連接器將 Debezium® 整合成引擎以捕捉數據變更。

Flink 支援將 Debezium JSON 和 Avro 訊息解譯為 INSERT/UPDATE/DELETE 訊息至 Apache Flink SQL 系統。

在許多情況下,這項支援很有用:

  • 將累加數據從資料庫同步處理至其他系統
  • 稽核記錄
  • 在資料庫上建置即時具體化檢視
  • 檢視資料表的時間聯結變更歷史

現在,讓我們瞭解如何使用 Flink-SQL CDC 監視 PostgreSQL 數據表上的變更。 PostgreSQL CDC 連接器可讓您從 PostgreSQL 資料庫讀取快照集數據和增量數據。

先決條件

準備 PostgreSQL 資料表 & 用戶端

  • 使用 Linux 虛擬機,使用下列命令安裝 PostgreSQL 用戶端

    sudo apt-get update
    sudo apt-get install postgresql-client
    
  • 安裝憑證以使用 SSL 連線到 PostgreSQL 伺服器

    wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem

  • 連線到伺服器(據以取代主機、使用者名稱與資料庫名稱)

    psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
    
  • 成功連線到資料庫之後,請建立範例數據表

    CREATE TABLE shipments (
        shipment_id SERIAL NOT NULL PRIMARY KEY,
        order_id SERIAL NOT NULL,
        origin VARCHAR(255) NOT NULL,
        destination VARCHAR(255) NOT NULL,
        is_arrived BOOLEAN NOT NULL
      );
      ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
      ALTER TABLE public.shipments REPLICA IDENTITY FULL;
      INSERT INTO shipments
      VALUES (default,10001,'Beijing','Shanghai',false),
         (default,10002,'Hangzhou','Shanghai',false),
         (default,10003,'Shanghai','Hangzhou',false);
    
  • 若要在 PostgreSQL 資料庫上啟用 CDC,您必須進行下列變更。

    • WAL 層級必須變更為 邏輯。 您可以在 Azure 入口網站的 [伺服器參數] 區段中變更此值。

      顯示如何在 PostgreSQL 資料庫上啟用 CDC 的螢幕截圖。

    • 存取數據表的用戶必須新增 「複寫」角色

      ALTER USER <username> WITH REPLICATION;

  • 若要建立 Flink PostgreSQL CDC 數據表,請下載所有相依 jar。 使用具有下列內容的 pom.xml 檔案。

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dep.download</groupId>
        <artifactId>dep-download</artifactId>
        <version>1.0-SNAPSHOT</version>
            <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
        <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        </dependencies>
    </project>
    
  • 使用 maven 命令下載所有相依的 JAR 檔案

       mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
    

    注意

  • 下載相依 jar 之後,啟動 Flink SQL 用戶端,並將這些 jar 匯入會話。 請按以下指示完成命令:

    /opt/flink-webssh/bin/sql-client.sh -j
    /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j
    /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j
    /opt/flink-webssh/target/hamcrest-2.1.jar -j
    /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j
    /opt/flink-webssh/target/awaitility-4.0.1.jar -j
    /opt/flink-webssh/target/jsr308-all-1.1.2.jar
    

    這些命令會連同相依性啟動 sql 用戶端,

    user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar 
    
                                      ????????
                                  ????????????????
                               ???????        ??????? ?
                             ????   ?????????      ?????
                             ???         ???????    ?????
                               ???            ???   ?????
                                 ??       ???????????????
                               ?? ?   ???      ?????? ?????
                               ?????   ????     ????? ?????
                            ???????       ???   ??????? ???
                      ????????? ??         ??   ??????????
                     ????????  ??          ?   ?? ???????
                   ????  ???           ?  ?? ???????? ?????
                  ???? ? ??          ? ?? ????????    ???? ??
                 ???? ????          ??????????       ??? ?? ????
              ???? ?? ???       ???????????         ???? ? ?  ???
              ??? ?? ??? ?????????             ????           ???
              ??   ? ???????             ????????          ??? ??
              ???    ???   ????????????????????           ????  ?
             ????? ???   ??????  ????????                 ????  ??
             ????????  ???????????????                            ??
             ?? ????   ??????? ???       ??????    ??         ???
             ??? ???  ??? ???????            ????   ?????????????
              ??? ?????  ???? ??                ??      ????  ???
              ??  ???   ?     ??                ??              ??
               ??  ??         ??                 ??        ????????
                ?? ?????       ??                  ???????????    ??
                 ??   ????     ?                    ???????      ??
                  ???   ?????                         ?? ???????????
                   ????    ????                     ??????? ????????
                     ?????                          ??  ???? ?????
                        ????????????????????????????????? ?????
    
       ______ _ _       _      _____  ____  _        _____ _ _            _  BETA   
      | ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | | 
      | |__ | |_ _ __ | | __ | (___ | |  | | |      | |    | |_ ___ _ __ | |_ 
      |  __| | | | '_ \| |/ /  \___ \| |  | | |     | |    | | |/ _ \ '_ \| __|
      | |   | | | | | |   <   ____) | |__| | |____  | |____| | | __/ | | | |_ 
      |_|   |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
    
           Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/xcao/.flink-sql-history
    
    Flink SQL>
    
  • 使用 CDC 連接器建立 Flink PostgreSQL CDC 數據表

    CREATE TABLE shipments (
      shipment_id INT,
      order_id INT,
      origin STRING,
      destination STRING,
      is_arrived BOOLEAN,
      PRIMARY KEY (shipment_id) NOT ENFORCED
    ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'flinkpostgres.postgres.database.azure.com',
      'port' = '5432',
      'username' = 'username',
      'password' = 'password',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'shipments',
      'decoding.plugin.name' = 'pgoutput',
      'slot.name' = 'flink'
    );
    

驗證

  • 執行 'select *' 命令來監視變更。

    select * from shipments;

    顯示如何執行 select-command 的螢幕快照。

參考