Zmienianie przechwytywania danych (CDC) tabeli PostgreSQL przy użyciu narzędzia Apache Flink®
Uwaga
Wycofamy usługę Azure HDInsight w usłudze AKS 31 stycznia 2025 r. Przed 31 stycznia 2025 r. należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure, aby uniknąć nagłego zakończenia obciążeń. Pozostałe klastry w ramach subskrypcji zostaną zatrzymane i usunięte z hosta.
Tylko podstawowa pomoc techniczna będzie dostępna do daty wycofania.
Ważne
Ta funkcja jest aktualnie dostępna jako funkcja podglądu. Dodatkowe warunki użytkowania dla wersji zapoznawczych platformy Microsoft Azure obejmują więcej warunków prawnych, które dotyczą funkcji platformy Azure, które znajdują się w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej, zobacz Informacje o wersji zapoznawczej usługi Azure HDInsight w usłudze AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie w usłudze AskHDInsight , aby uzyskać szczegółowe informacje i postępuj zgodnie z nami, aby uzyskać więcej aktualizacji w społeczności usługi Azure HDInsight.
Funkcja przechwytywania zmian danych (CDC) to technika umożliwiająca śledzenie zmian na poziomie wiersza w tabelach bazy danych w odpowiedzi na operacje tworzenia, aktualizowania i usuwania. W tym artykule używamy łączników CDC dla platformy Apache Flink, które oferują zestaw łączników źródłowych dla platformy Apache Flink®. Łączniki integrują debezium® jako aparat w celu przechwycenia zmian danych.
Funkcja Flink obsługuje interpretowanie komunikatów Debezium JSON i Avro jako komunikatów INSERT/UPDATE/DELETE do systemu Apache Flink SQL.
Ta obsługa jest przydatna w wielu przypadkach w następujących przypadkach:
- Synchronizowanie przyrostowych danych z baz danych do innych systemów
- Dzienniki inspekcji
- Tworzenie zmaterializowanych widoków w czasie rzeczywistym w bazach danych
- Wyświetlanie historii zmiany sprzężenia czasowego tabeli bazy danych
Teraz dowiesz się, jak monitorować zmiany w tabeli PostgreSQL przy użyciu usługi Flink-SQL CDC. Łącznik CDC postgreSQL umożliwia odczytywanie danych migawek i danych przyrostowych z bazy danych PostgreSQL.
Wymagania wstępne
- Serwer elastyczny usługi Azure PostgresSQL w wersji 14.7
- Klaster Apache Flink w usłudze HDInsight w usłudze AKS
- Maszyna wirtualna z systemem Linux do korzystania z klienta PostgreSQL
- Dodaj regułę sieciowej grupy zabezpieczeń, która zezwala na połączenia przychodzące i wychodzące na porcie 5432 w usłudze HDInsight w podsieci puli usługi AKS.
Przygotowywanie tabeli i klienta bazy danych PostgreSQL
Korzystając z maszyny wirtualnej z systemem Linux, zainstaluj klienta PostgreSQL przy użyciu poniższych poleceń
sudo apt-get update sudo apt-get install postgresql-client
Instalowanie certyfikatu w celu nawiązania połączenia z serwerem PostgreSQL przy użyciu protokołu SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Nawiąż połączenie z serwerem (odpowiednio zastąp nazwę hosta, nazwy użytkownika i bazy danych)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
Po pomyślnym nawiązaniu połączenia z bazą danych utwórz przykładową tabelę
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Aby włączyć usługę CDC w bazie danych PostgreSQL, musisz wprowadzić następujące zmiany.
Tworzenie tabeli CDC usługi Apache Flink PostgreSQL
Aby utworzyć tabelę CDC Flink PostgreSQL, pobierz wszystkie zależne pliki jar.
pom.xml
Użyj pliku z następującą zawartością.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Użyj polecenia maven, aby pobrać wszystkie zależne pliki jar
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Uwaga
- Jeśli zasobnik ssh w sieci Web nie zawiera narzędzia maven, postępuj zgodnie z linkami, aby pobrać i zainstalować go.
- Aby pobrać plik jar jsr, użyj następującego polecenia
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
Po pobraniu zależnych plików jar uruchom klienta SQL Flink, a te pliki jar zostaną zaimportowane do sesji. Ukończ polecenie w następujący sposób,
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Te polecenia uruchamiają klienta SQL z zależnościami w następujący sposób:
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Tworzenie tabeli CDC Flink PostgreSQL przy użyciu łącznika CDC
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Walidacja
Odwołanie
- Witryna internetowa platformy Apache Flink
- Łącznik CDC bazy danych PostgreSQL jest licencjonowany w ramach licencji apache 2.0
- Nazwy projektów apache, Apache Flink, Flink i skojarzone z nimi są znakami towarowymi programu Apache Software Foundation (ASF).