Změna zachytávání dat (CDC) tabulky PostgreSQL pomocí Apache Flinku®
Důležitý
Azure HDInsight v AKS byl vyřazen 31. ledna 2025. Zjistěte více v tomto oznámení.
Abyste se vyhnuli náhlému ukončení úloh, musíte migrovat úlohy do Microsoft Fabric nebo ekvivalentního produktu Azure.
Důležitý
Tato funkce je aktuálně ve verzi Preview. doplňkové podmínky použití pro verze Preview Microsoft Azure obsahují další právní podmínky, které se vztahují na funkce Azure, které jsou v beta verzi, ve verzi Preview nebo ještě nebyly vydány v obecné dostupnosti. Informace o této konkrétní ukázce najdete v Azure HDInsight ve verzi preview ve službě AKS. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost na AskHDInsight s podrobnostmi a sledujte nás pro další aktualizace na komunitu Azure HDInsight .
Change Data Capture (CDC) je technika, kterou můžete použít ke sledování změn na úrovni řádků v databázových tabulkách v reakci na operace vytváření, aktualizace a odstraňování. V tomto článku používáme konektory CDC pro Apache Flink®, které nabízejí sadu zdrojových konektorů pro Apache Flink. Konektory integrují Debezium® jako stroj pro zachycení změn dat.
Flink podporuje interpretaci zpráv Debezium JSON a Avro jako insert/UPDATE/DELETE zpráv do systému Apache Flink SQL.
Tato podpora je užitečná v mnoha případech pro:
- Synchronizace přírůstkových dat z databází do jiných systémů
- Protokoly auditu
- Vytváření materializovaných zobrazení v reálném čase v databázích
- Zobrazení historie změn časového spojení databázové tabulky
Teď se dozvíme, jak monitorovat změny v tabulce PostgreSQL pomocí Flink-SQL CDC. Konektor PostgreSQL CDC umožňuje čtení dat snímků a přírůstkových dat z databáze PostgreSQL.
Požadavky
- Azure PostgreSQL flexibilní server verze 14.7
- clusteru Apache Flink ve službě HDInsight ve službě AKS
- Virtuální počítač s Linuxem pro použití klienta PostgreSQL
- Přidejte pravidlo NSG, které umožňuje příchozí a odchozí připojení na portu 5432 v HDInsight v podsíti fondu AKS.
Příprava tabulky PostgreSQL & Client
Pomocí virtuálního počítače s Linuxem nainstalujte klienta PostgreSQL pomocí následujících příkazů.
sudo apt-get update sudo apt-get install postgresql-client
Instalace certifikátu pro připojení k serveru PostgreSQL pomocí SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Připojte se k serveru (odpovídajícím způsobem nahraďte hostitele, uživatelské jméno a název databáze).
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
Po úspěšném připojení k databázi vytvořte ukázkovou tabulku.
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Pokud chcete povolit CDC v databázi PostgreSQL, musíte provést následující změny.
Vytvoření tabulky Apache Flink PostgreSQL CDC
Pokud chcete vytvořit tabulku Flink PostgreSQL CDC, stáhněte všechny závislé soubory JAR. Použijte soubor
pom.xml
s následujícím obsahem.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Použití příkazu maven ke stažení všech závislých souborů JAR
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Poznámka
- Pokud váš pod web ssh neobsahuje maven, postupujte podle odkazů ke stažení a instalaci.
- Ke stažení souboru jsr jar použijte následující příkaz.
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
Jakmile se závislé soubory JAR stáhnou, spustí se klient Flink SQLs těmito soubory JAR, které se mají importovat do relace. Dokončete příkaz následujícím způsobem:
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Tyto příkazy spustí klienta SQL se závislostmi jako:
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Vytvoření tabulky CDC Flink PostgreSQL pomocí konektoru CDC
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Validace
Odkaz
- Stránky Apache Flink
- Konektor PostgreSQL CDC je licencován podle licence Apache 2.0
- Názvy projektů Apache, Apache Flink, Flink a přidružených open source projektů jsou ochranné známky Apache Software Foundation (ASF).