次の方法で共有


Apache Flink® を使用した PostgreSQL テーブルの変更データ キャプチャ (CDC)

Note

Azure HDInsight on AKS は 2025 年 1 月 31 日に廃止されます。 2025 年 1 月 31 日より前に、ワークロードを Microsoft Fabric または同等の Azure 製品に移行することで、ワークロードの突然の終了を回避する必要があります。 サブスクリプション上に残っているクラスターは停止され、ホストから削除されることになります。

提供終了日までは基本サポートのみが利用できます。

重要

現在、この機能はプレビュー段階にあります。 ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用されるその他の法律条項については、「Microsoft Azure プレビューの追加の使用条件」に記載されています。 この特定のプレビューについては、「Microsoft HDInsight on AKS のプレビュー情報」を参照してください。 質問や機能の提案については、詳細を記載した要求を AskHDInsight で送信してください。また、その他の更新情報については、Azure HDInsight コミュニティのフォローをお願いいたします。

変更データ キャプチャ (CDC) は、作成、更新、削除操作に応答して、データベース テーブル内の行レベルの変更を追跡するために使用できる手法です。 この記事で使用する CDC Connectors for Apache Flink® では、Apache Flink 用の一連のソース コネクタが提供されます。 これらのコネクタでは、データの変更をキャプチャするためのエンジンとして Debezium® が統合されています。

Flink では、Debezium JSON メッセージおよび Avro メッセージを、Apache Flink SQL システムへの INSERT/UPDATE/DELETE メッセージとして解釈することをサポートしています。

このサポートは多くの場合、次を行うのに役立ちます。

  • データベースから他のシステムに増分データを同期する
  • 監査ログ
  • データベースに対してリアルタイムの具体化されたビューを構築する
  • データベース テーブルのテンポラル結合変更履歴を表示する

次に、Flink-SQL CDC を使用して PostgreSQL テーブルの変更を監視する方法について説明します。 PostgreSQL CDC コネクタを使用すると、PostgreSQL データベースからスナップショット データと増分データを読み取ることができます。

前提条件

PostgreSQL テーブルとクライアントを準備する

  • Linux 仮想マシンで、次のコマンドを使用して PostgreSQL クライアントをインストールします

    sudo apt-get update
    sudo apt-get install postgresql-client
    
  • SSL を使用して PostgreSQL サーバーに接続するための証明書をインストールします

    wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem

  • サーバーに接続します (それに応じてホスト、ユーザー名、データベース名を置き換えます)

    psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
    
  • データベースに正常に接続したら、サンプル テーブルを作成します

    CREATE TABLE shipments (
        shipment_id SERIAL NOT NULL PRIMARY KEY,
        order_id SERIAL NOT NULL,
        origin VARCHAR(255) NOT NULL,
        destination VARCHAR(255) NOT NULL,
        is_arrived BOOLEAN NOT NULL
      );
      ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
      ALTER TABLE public.shipments REPLICA IDENTITY FULL;
      INSERT INTO shipments
      VALUES (default,10001,'Beijing','Shanghai',false),
         (default,10002,'Hangzhou','Shanghai',false),
         (default,10003,'Shanghai','Hangzhou',false);
    
  • PostgreSQL データベースで CDC を有効にするには、次の変更を行う必要があります。

    • WAL レベルを論理に変更する必要があります。 この値は、Azure portal のサーバー パラメーター セクションで変更できます。

      enable-cdc-on-postgres-database を行う方法を示すスクリーンショット。

    • テーブルにアクセスするユーザーには、"レプリケーション" ロールが追加されている必要があります

      レプリケーションが付与されたユーザー <username> に変更してください。

  • Flink PostgreSQL CDC テーブルを作成するには、すべての依存 jar をダウンロードします。 次の内容を含む pom.xml ファイルを作成します。

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dep.download</groupId>
        <artifactId>dep-download</artifactId>
        <version>1.0-SNAPSHOT</version>
            <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
        <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        </dependencies>
    </project>
    
  • maven コマンドを使用してすべての依存 jar をダウンロードします

       mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
    

    Note

    • Web SSH ポッドに maven が含まれていない場合は、リンクに従ってダウンロードしてインストールしてください。
    • jsr jar ファイルをダウンロードするには、次のコマンドを使用します
      • wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
  • 依存 jar をダウンロードしたら、Flink SQL クライアントを起動し、これらの jar をセッションにインポートします。 コマンドを次のとおりに実行します。

    /opt/flink-webssh/bin/sql-client.sh -j
    /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j
    /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j
    /opt/flink-webssh/target/hamcrest-2.1.jar -j
    /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j
    /opt/flink-webssh/target/awaitility-4.0.1.jar -j
    /opt/flink-webssh/target/jsr308-all-1.1.2.jar
    

    これらのコマンドでは、次のように依存関係を使用して SQL クライアントを起動します。

    user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar 
    
                                      ????????
                                  ????????????????
                               ???????        ??????? ?
                             ????   ?????????      ?????
                             ???         ???????    ?????
                               ???            ???   ?????
                                 ??       ???????????????
                               ?? ?   ???      ?????? ?????
                               ?????   ????     ????? ?????
                            ???????       ???   ??????? ???
                      ????????? ??         ??   ??????????
                     ????????  ??          ?   ?? ???????
                   ????  ???           ?  ?? ???????? ?????
                  ???? ? ??          ? ?? ????????    ???? ??
                 ???? ????          ??????????       ??? ?? ????
              ???? ?? ???       ???????????         ???? ? ?  ???
              ??? ?? ??? ?????????             ????           ???
              ??   ? ???????             ????????          ??? ??
              ???    ???   ????????????????????           ????  ?
             ????? ???   ??????  ????????                 ????  ??
             ????????  ???????????????                            ??
             ?? ????   ??????? ???       ??????    ??         ???
             ??? ???  ??? ???????            ????   ?????????????
              ??? ?????  ???? ??                ??      ????  ???
              ??  ???   ?     ??                ??              ??
               ??  ??         ??                 ??        ????????
                ?? ?????       ??                  ???????????    ??
                 ??   ????     ?                    ???????      ??
                  ???   ?????                         ?? ???????????
                   ????    ????                     ??????? ????????
                     ?????                          ??  ???? ?????
                        ????????????????????????????????? ?????
    
       ______ _ _       _      _____  ____  _        _____ _ _            _  BETA   
      | ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | | 
      | |__ | |_ _ __ | | __ | (___ | |  | | |      | |    | |_ ___ _ __ | |_ 
      |  __| | | | '_ \| |/ /  \___ \| |  | | |     | |    | | |/ _ \ '_ \| __|
      | |   | | | | | |   <   ____) | |__| | |____  | |____| | | __/ | | | |_ 
      |_|   |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
    
           Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/xcao/.flink-sql-history
    
    Flink SQL>
    
  • CDC コネクタを使用して Flink PostgreSQL CDC テーブルを作成します

    CREATE TABLE shipments (
      shipment_id INT,
      order_id INT,
      origin STRING,
      destination STRING,
      is_arrived BOOLEAN,
      PRIMARY KEY (shipment_id) NOT ENFORCED
    ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'flinkpostgres.postgres.database.azure.com',
      'port' = '5432',
      'username' = 'username',
      'password' = 'password',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'shipments',
      'decoding.plugin.name' = 'pgoutput',
      'slot.name' = 'flink'
    );
    

検証

  • 'select *' コマンドを実行して変更を監視します。

    select * from shipments;

    run-select-command を行う方法を示すスクリーンショット。

リファレンス