次の方法で共有


Apache Flink® を使用して PostgreSQL テーブルのデータ キャプチャ (CDC) を変更する

大事な

AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 この発表 を通じてについて詳しく知ることができます。

ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。

大事な

この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能の提案については、詳細を添えて AskHDInsight にリクエストを送信し、Azure HDInsight Communityをフォローして、さらなる更新情報を入手してください。

Change Data Capture (CDC) は、作成、更新、および削除操作に応答してデータベース テーブルの行レベルの変更を追跡するために使用できる手法です。 この記事では、Apache Flink 用 CDC Connectors for Apache Flink®を使用します。これは、Apache Flink 用の一連のソース コネクタを提供します。 コネクタは、Debezium® をエンジンとして統合して、データの変更をキャプチャします。

Flink は、Debezium JSON および Avro メッセージを INSERT/UPDATE/DELETE メッセージとして Apache Flink SQL システムに解釈することをサポートしています。

このサポートは、多くの場合、次の場合に役立ちます。

  • データベースから他のシステムに増分データを同期する
  • 監査ログ
  • データベースに対してリアルタイムの具体化されたビューを構築する
  • データベース テーブルのテンポラル結合変更履歴を表示する

次に、Flink-SQL CDC を使用して PostgreSQL テーブルの変更を監視する方法について説明します。 PostgreSQL CDC コネクタを使用すると、PostgreSQL データベースからスナップショット データと増分データを読み取ることができます。

前提 条件

PostgreSQL テーブル & クライアントを準備する

  • Linux 仮想マシンを使用して、次のコマンドを使用して PostgreSQL クライアントをインストールする

    sudo apt-get update
    sudo apt-get install postgresql-client
    
  • SSL を使用して PostgreSQL サーバーに接続するための証明書をインストールする

    wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem

  • サーバーに接続します (それに応じてホスト、ユーザー名、データベース名を置き換えます)

    psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
    
  • データベースに正常に接続したら、サンプル テーブルを作成します

    CREATE TABLE shipments (
        shipment_id SERIAL NOT NULL PRIMARY KEY,
        order_id SERIAL NOT NULL,
        origin VARCHAR(255) NOT NULL,
        destination VARCHAR(255) NOT NULL,
        is_arrived BOOLEAN NOT NULL
      );
      ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
      ALTER TABLE public.shipments REPLICA IDENTITY FULL;
      INSERT INTO shipments
      VALUES (default,10001,'Beijing','Shanghai',false),
         (default,10002,'Hangzhou','Shanghai',false),
         (default,10003,'Shanghai','Hangzhou',false);
    
  • PostgreSQL データベースで CDC を有効にするには、次の変更を行う必要があります。

    • WAL レベルは 論理に変更する必要があります。 この値は、Azure portal のサーバー パラメーター セクションで変更できます。

      -cdc-on-postgres-database を有効にする方法を示すスクリーンショット。

    • テーブルにアクセスするユーザーには、"REPLICATION" ロールが追加されている必要があります

      ALTER USER <username> WITH REPLICATION;

  • Flink PostgreSQL CDC テーブルを作成するには、すべての依存 jar をダウンロードします。 次の内容で pom.xml ファイルを使用します。

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dep.download</groupId>
        <artifactId>dep-download</artifactId>
        <version>1.0-SNAPSHOT</version>
            <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
        <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        </dependencies>
    </project>
    
  • maven コマンドを使用してすべての依存 jar をダウンロードする

       mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
    

    手記

    • Web ssh ポッドに maven が含まれていない場合は、リンクに従ってダウンロードしてインストールしてください。
    • jsr jar ファイルをダウンロードするには、次のコマンドを使用します。
      • wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
  • 依存 jar がダウンロードされたら、Flink SQL クライアントを開始し、これらの jar をセッションにインポートします。 次のようにコマンドを完了します。

    /opt/flink-webssh/bin/sql-client.sh -j
    /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j
    /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j
    /opt/flink-webssh/target/hamcrest-2.1.jar -j
    /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j
    /opt/flink-webssh/target/awaitility-4.0.1.jar -j
    /opt/flink-webssh/target/jsr308-all-1.1.2.jar
    

    これらのコマンドは、次のように依存関係を使用して SQL クライアントを起動します。

    user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar 
    
                                      ????????
                                  ????????????????
                               ???????        ??????? ?
                             ????   ?????????      ?????
                             ???         ???????    ?????
                               ???            ???   ?????
                                 ??       ???????????????
                               ?? ?   ???      ?????? ?????
                               ?????   ????     ????? ?????
                            ???????       ???   ??????? ???
                      ????????? ??         ??   ??????????
                     ????????  ??          ?   ?? ???????
                   ????  ???           ?  ?? ???????? ?????
                  ???? ? ??          ? ?? ????????    ???? ??
                 ???? ????          ??????????       ??? ?? ????
              ???? ?? ???       ???????????         ???? ? ?  ???
              ??? ?? ??? ?????????             ????           ???
              ??   ? ???????             ????????          ??? ??
              ???    ???   ????????????????????           ????  ?
             ????? ???   ??????  ????????                 ????  ??
             ????????  ???????????????                            ??
             ?? ????   ??????? ???       ??????    ??         ???
             ??? ???  ??? ???????            ????   ?????????????
              ??? ?????  ???? ??                ??      ????  ???
              ??  ???   ?     ??                ??              ??
               ??  ??         ??                 ??        ????????
                ?? ?????       ??                  ???????????    ??
                 ??   ????     ?                    ???????      ??
                  ???   ?????                         ?? ???????????
                   ????    ????                     ??????? ????????
                     ?????                          ??  ???? ?????
                        ????????????????????????????????? ?????
    
       ______ _ _       _      _____  ____  _        _____ _ _            _  BETA   
      | ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | | 
      | |__ | |_ _ __ | | __ | (___ | |  | | |      | |    | |_ ___ _ __ | |_ 
      |  __| | | | '_ \| |/ /  \___ \| |  | | |     | |    | | |/ _ \ '_ \| __|
      | |   | | | | | |   <   ____) | |__| | |____  | |____| | | __/ | | | |_ 
      |_|   |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
    
           Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/xcao/.flink-sql-history
    
    Flink SQL>
    
  • CDC コネクタを使用して Flink PostgreSQL CDC テーブルを作成する

    CREATE TABLE shipments (
      shipment_id INT,
      order_id INT,
      origin STRING,
      destination STRING,
      is_arrived BOOLEAN,
      PRIMARY KEY (shipment_id) NOT ENFORCED
    ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'flinkpostgres.postgres.database.azure.com',
      'port' = '5432',
      'username' = 'username',
      'password' = 'password',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'shipments',
      'decoding.plugin.name' = 'pgoutput',
      'slot.name' = 'flink'
    );
    

検証

  • 'select *' コマンドを実行して変更を監視します。

    select * from shipments;

    run-select-command を実行する方法を示すスクリーンショット。

参考