Apache Flink® を使用して PostgreSQL テーブルのデータ キャプチャ (CDC) を変更する
大事な
AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 この発表 を通じてについて詳しく知ることができます。
ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。
大事な
この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能の提案については、詳細を添えて AskHDInsight にリクエストを送信し、Azure HDInsight Communityをフォローして、さらなる更新情報を入手してください。
Change Data Capture (CDC) は、作成、更新、および削除操作に応答してデータベース テーブルの行レベルの変更を追跡するために使用できる手法です。 この記事では、Apache Flink 用 CDC Connectors for Apache Flink®を使用します。これは、Apache Flink 用の一連のソース コネクタを提供します。 コネクタは、Debezium® をエンジンとして統合して、データの変更をキャプチャします。
Flink は、Debezium JSON および Avro メッセージを INSERT/UPDATE/DELETE メッセージとして Apache Flink SQL システムに解釈することをサポートしています。
このサポートは、多くの場合、次の場合に役立ちます。
- データベースから他のシステムに増分データを同期する
- 監査ログ
- データベースに対してリアルタイムの具体化されたビューを構築する
- データベース テーブルのテンポラル結合変更履歴を表示する
次に、Flink-SQL CDC を使用して PostgreSQL テーブルの変更を監視する方法について説明します。 PostgreSQL CDC コネクタを使用すると、PostgreSQL データベースからスナップショット データと増分データを読み取ることができます。
前提 条件
- Azure PostgresSQL フレキシブル サーバー バージョン 14.7
- HDInsight 上の AKS での Apache Flink クラスター
- PostgreSQL クライアントを使用する Linux 仮想マシン
- AKS プール サブネット上の HDInsight のポート 5432 で受信接続と送信接続を許可する NSG 規則を追加します。
PostgreSQL テーブル & クライアントを準備する
Linux 仮想マシンを使用して、次のコマンドを使用して PostgreSQL クライアントをインストールする
sudo apt-get update sudo apt-get install postgresql-client
SSL を使用して PostgreSQL サーバーに接続するための証明書をインストールする
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
サーバーに接続します (それに応じてホスト、ユーザー名、データベース名を置き換えます)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
データベースに正常に接続したら、サンプル テーブルを作成します
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
PostgreSQL データベースで CDC を有効にするには、次の変更を行う必要があります。
Apache Flink PostgreSQL CDC テーブルを作成する
Flink PostgreSQL CDC テーブルを作成するには、すべての依存 jar をダウンロードします。 次の内容で
pom.xml
ファイルを使用します。<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
maven コマンドを使用してすべての依存 jar をダウンロードする
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
手記
- Web ssh ポッドに maven が含まれていない場合は、リンクに従ってダウンロードしてインストールしてください。
- jsr jar ファイルをダウンロードするには、次のコマンドを使用します。
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
依存 jar がダウンロードされたら、Flink SQL クライアントを開始し、これらの jar をセッションにインポートします。 次のようにコマンドを完了します。
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
これらのコマンドは、次のように依存関係を使用して SQL クライアントを起動します。
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
CDC コネクタを使用して Flink PostgreSQL CDC テーブルを作成する
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
検証
参考
- Apache Flink ウェブサイト
- PostgreSQL CDC Connector は、Apache 2.0 ライセンス でライセンスされます
- Apache、Apache Flink、Flink、および関連するオープン ソース プロジェクト名は、Apache Software Foundation (ASF) の 商標です。