Apache Flink® を使用した PostgreSQL テーブルの変更データ キャプチャ (CDC)
Note
Azure HDInsight on AKS は 2025 年 1 月 31 日に廃止されます。 2025 年 1 月 31 日より前に、ワークロードを Microsoft Fabric または同等の Azure 製品に移行することで、ワークロードの突然の終了を回避する必要があります。 サブスクリプション上に残っているクラスターは停止され、ホストから削除されることになります。
提供終了日までは基本サポートのみが利用できます。
重要
現在、この機能はプレビュー段階にあります。 ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用されるその他の法律条項については、「Microsoft Azure プレビューの追加の使用条件」に記載されています。 この特定のプレビューについては、「Microsoft HDInsight on AKS のプレビュー情報」を参照してください。 質問や機能の提案については、詳細を記載した要求を AskHDInsight で送信してください。また、その他の更新情報については、Azure HDInsight コミュニティのフォローをお願いいたします。
変更データ キャプチャ (CDC) は、作成、更新、削除操作に応答して、データベース テーブル内の行レベルの変更を追跡するために使用できる手法です。 この記事で使用する CDC Connectors for Apache Flink® では、Apache Flink 用の一連のソース コネクタが提供されます。 これらのコネクタでは、データの変更をキャプチャするためのエンジンとして Debezium® が統合されています。
Flink では、Debezium JSON メッセージおよび Avro メッセージを、Apache Flink SQL システムへの INSERT/UPDATE/DELETE メッセージとして解釈することをサポートしています。
このサポートは多くの場合、次を行うのに役立ちます。
- データベースから他のシステムに増分データを同期する
- 監査ログ
- データベースに対してリアルタイムの具体化されたビューを構築する
- データベース テーブルのテンポラル結合変更履歴を表示する
次に、Flink-SQL CDC を使用して PostgreSQL テーブルの変更を監視する方法について説明します。 PostgreSQL CDC コネクタを使用すると、PostgreSQL データベースからスナップショット データと増分データを読み取ることができます。
前提条件
- Azure PostgresSQL フレキシブル サーバー バージョン 14.7
- AKS 上の HDInsight での Apache Flink クラスター
- PostgreSQL クライアントを使用する Linux 仮想マシン
- AKS プール サブネット上の HDInsight のポート 5432 で受信接続と送信接続を許可する NSG 規則を追加します。
PostgreSQL テーブルとクライアントを準備する
Linux 仮想マシンで、次のコマンドを使用して PostgreSQL クライアントをインストールします
sudo apt-get update sudo apt-get install postgresql-client
SSL を使用して PostgreSQL サーバーに接続するための証明書をインストールします
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
サーバーに接続します (それに応じてホスト、ユーザー名、データベース名を置き換えます)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
データベースに正常に接続したら、サンプル テーブルを作成します
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
PostgreSQL データベースで CDC を有効にするには、次の変更を行う必要があります。
Apache Flink PostgreSQL CDC テーブルを作成する
Flink PostgreSQL CDC テーブルを作成するには、すべての依存 jar をダウンロードします。 次の内容を含む
pom.xml
ファイルを作成します。<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
maven コマンドを使用してすべての依存 jar をダウンロードします
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Note
- Web SSH ポッドに maven が含まれていない場合は、リンクに従ってダウンロードしてインストールしてください。
- jsr jar ファイルをダウンロードするには、次のコマンドを使用します
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
依存 jar をダウンロードしたら、Flink SQL クライアントを起動し、これらの jar をセッションにインポートします。 コマンドを次のとおりに実行します。
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
これらのコマンドでは、次のように依存関係を使用して SQL クライアントを起動します。
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar| ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
CDC コネクタを使用して Flink PostgreSQL CDC テーブルを作成します
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
検証
リファレンス
- Apache Flink Web サイト
- PostgreSQL CDC コネクタは、Apache 2.0 ライセンスでライセンスされます
- Apache、Apache Flink、Flink、関連するオープン ソース プロジェクト名は、Apache Software Foundation (ASF) の商標です。