Zmienianie przechwytywania danych (CDC) tabeli PostgreSQL przy użyciu narzędzia Apache Flink®
Ważny
Usługa Azure HDInsight w usłudze AKS została wycofana 31 stycznia 2025 r. Dowiedz się więcej z tego ogłoszenia.
Aby uniknąć nagłego kończenia obciążeń, należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure.
Ważny
Ta funkcja jest obecnie dostępna w wersji zapoznawczej. Dodatkowe Warunki Użytkowania Platformy Microsoft Azure zawierają dodatkowe warunki prawne, które odnoszą się do funkcji platformy Azure dostępnych w wersji beta, zapoznawczej lub w inny sposób jeszcze nie zostały ogólnie udostępnione. Aby uzyskać informacje na temat tej konkretnej wersji zapoznawczej, zobacz Informacje o wersji zapoznawczej Azure HDInsight na AKS. W przypadku pytań lub sugestii dotyczących funkcji, wyślij zapytanie na AskHDInsight z podaniem szczegółów i śledź nas, aby uzyskać więcej aktualizacji w społeczności Azure HDInsight.
Funkcja przechwytywania zmian danych (CDC) to technika umożliwiająca śledzenie zmian na poziomie wiersza w tabelach bazy danych w odpowiedzi na operacje tworzenia, aktualizowania i usuwania. W tym artykule używamy łączników CDC dlaApache Flink®, które oferują zestaw łączników źródłowych dla platformy Apache Flink. Łączniki integrują Debezium® jako silnik do przechwytywania zmian danych.
Funkcja Flink obsługuje interpretowanie komunikatów Debezium JSON i Avro jako komunikatów INSERT/UPDATE/DELETE do systemu Apache Flink SQL.
Ta obsługa jest przydatna w wielu przypadkach w następujących przypadkach:
- Synchronizowanie przyrostowych danych z baz danych do innych systemów
- Dzienniki inspekcji
- Tworzenie zmaterializowanych widoków w czasie rzeczywistym w bazach danych
- Wyświetl historię zmian połączenia temporalnego tabeli bazy danych
Teraz dowiesz się, jak monitorować zmiany w tabeli PostgreSQL przy użyciu usługi Flink-SQL CDC. Łącznik CDC PostgreSQL umożliwia odczytywanie danych migawek i danych inkrementalnych z bazy danych PostgreSQL.
Warunki wstępne
- serwera elastycznego Azure PostgresSQL w wersji 14.7
- klaster Apache Flink w usłudze HDInsight w usłudze AKS
- Maszyna wirtualna z systemem Linux do korzystania z klienta PostgreSQL
- Dodaj regułę grupy zabezpieczeń sieciowych, która zezwala na połączenia przychodzące i wychodzące na port 5432 w usłudze HDInsight w podsieci puli AKS.
Przygotowywanie tabeli PostgreSQL & Client
Korzystając z maszyny wirtualnej z systemem Linux, zainstaluj klienta PostgreSQL przy użyciu poniższych poleceń
sudo apt-get update sudo apt-get install postgresql-client
Instalowanie certyfikatu w celu nawiązania połączenia z serwerem PostgreSQL przy użyciu protokołu SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Nawiąż połączenie z serwerem (odpowiednio zastąp nazwę hosta, nazwy użytkownika i bazy danych)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
Po pomyślnym nawiązaniu połączenia z bazą danych utwórz przykładową tabelę
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Aby włączyć usługę CDC w bazie danych PostgreSQL, musisz wprowadzić następujące zmiany.
Tworzenie tabeli CDC usługi Apache Flink PostgreSQL
Aby utworzyć tabelę CDC Flink PostgreSQL, pobierz wszystkie zależne pliki JAR. Użyj pliku
pom.xml
z następującą zawartością.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Użyj polecenia maven, aby pobrać wszystkie zależne pliki jar
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Notatka
- Jeśli zasobnik ssh w sieci Web nie zawiera narzędzia maven, postępuj zgodnie z linkami, aby pobrać i zainstalować go.
- Aby pobrać plik jar jsr, użyj następującego polecenia
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
Po pobraniu zależnych jar-ów, uruchom Flink SQL client, aby te jar-y zostały zaimportowane do sesji. Ukończ polecenie w następujący sposób,
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Te polecenia uruchamiają klienta SQL z zależnościami w następujący sposób:
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Tworzenie tabeli CDC Flink PostgreSQL przy użyciu łącznika CDC
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Walidacja
Odniesienie
- Witryna internetowa Apache Flink
- Łącznik CDC PostgreSQL jest licencjonowany na licencji Apache 2.0
- Nazwy projektów typu open source Apache, Apache Flink, Flink i skojarzone są znakami towarowymiFundacji Apache Software (ASF).