Udostępnij za pośrednictwem


Zmienianie przechwytywania danych (CDC) tabeli PostgreSQL przy użyciu narzędzia Apache Flink®

Uwaga

Wycofamy usługę Azure HDInsight w usłudze AKS 31 stycznia 2025 r. Przed 31 stycznia 2025 r. należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure, aby uniknąć nagłego zakończenia obciążeń. Pozostałe klastry w ramach subskrypcji zostaną zatrzymane i usunięte z hosta.

Tylko podstawowa pomoc techniczna będzie dostępna do daty wycofania.

Ważne

Ta funkcja jest aktualnie dostępna jako funkcja podglądu. Dodatkowe warunki użytkowania dla wersji zapoznawczych platformy Microsoft Azure obejmują więcej warunków prawnych, które dotyczą funkcji platformy Azure, które znajdują się w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej, zobacz Informacje o wersji zapoznawczej usługi Azure HDInsight w usłudze AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie w usłudze AskHDInsight , aby uzyskać szczegółowe informacje i postępuj zgodnie z nami, aby uzyskać więcej aktualizacji w społeczności usługi Azure HDInsight.

Funkcja przechwytywania zmian danych (CDC) to technika umożliwiająca śledzenie zmian na poziomie wiersza w tabelach bazy danych w odpowiedzi na operacje tworzenia, aktualizowania i usuwania. W tym artykule używamy łączników CDC dla platformy Apache Flink, które oferują zestaw łączników źródłowych dla platformy Apache Flink®. Łączniki integrują debezium® jako aparat w celu przechwycenia zmian danych.

Funkcja Flink obsługuje interpretowanie komunikatów Debezium JSON i Avro jako komunikatów INSERT/UPDATE/DELETE do systemu Apache Flink SQL.

Ta obsługa jest przydatna w wielu przypadkach w następujących przypadkach:

  • Synchronizowanie przyrostowych danych z baz danych do innych systemów
  • Dzienniki inspekcji
  • Tworzenie zmaterializowanych widoków w czasie rzeczywistym w bazach danych
  • Wyświetlanie historii zmiany sprzężenia czasowego tabeli bazy danych

Teraz dowiesz się, jak monitorować zmiany w tabeli PostgreSQL przy użyciu usługi Flink-SQL CDC. Łącznik CDC postgreSQL umożliwia odczytywanie danych migawek i danych przyrostowych z bazy danych PostgreSQL.

Wymagania wstępne

Przygotowywanie tabeli i klienta bazy danych PostgreSQL

  • Korzystając z maszyny wirtualnej z systemem Linux, zainstaluj klienta PostgreSQL przy użyciu poniższych poleceń

    sudo apt-get update
    sudo apt-get install postgresql-client
    
  • Instalowanie certyfikatu w celu nawiązania połączenia z serwerem PostgreSQL przy użyciu protokołu SSL

    wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem

  • Nawiąż połączenie z serwerem (odpowiednio zastąp nazwę hosta, nazwy użytkownika i bazy danych)

    psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
    
  • Po pomyślnym nawiązaniu połączenia z bazą danych utwórz przykładową tabelę

    CREATE TABLE shipments (
        shipment_id SERIAL NOT NULL PRIMARY KEY,
        order_id SERIAL NOT NULL,
        origin VARCHAR(255) NOT NULL,
        destination VARCHAR(255) NOT NULL,
        is_arrived BOOLEAN NOT NULL
      );
      ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
      ALTER TABLE public.shipments REPLICA IDENTITY FULL;
      INSERT INTO shipments
      VALUES (default,10001,'Beijing','Shanghai',false),
         (default,10002,'Hangzhou','Shanghai',false),
         (default,10003,'Shanghai','Hangzhou',false);
    
  • Aby włączyć usługę CDC w bazie danych PostgreSQL, musisz wprowadzić następujące zmiany.

    • Poziom WAL musi zostać zmieniony na logiczny. Tę wartość można zmienić w sekcji parametry serwera w witrynie Azure Portal.

      Zrzut ekranu przedstawiający sposób włączania bazy danych cdc-on-postgres-database.

    • Użytkownik, który uzyskuje dostęp do tabeli, musi mieć dodaną rolę "REPLIKACJA"

      ZMIENIANIE UŻYTKOWNIKA <username> ZA POMOCĄ REPLIKACJI;

  • Aby utworzyć tabelę CDC Flink PostgreSQL, pobierz wszystkie zależne pliki jar. pom.xml Użyj pliku z następującą zawartością.

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dep.download</groupId>
        <artifactId>dep-download</artifactId>
        <version>1.0-SNAPSHOT</version>
            <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
        <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        </dependencies>
    </project>
    
  • Użyj polecenia maven, aby pobrać wszystkie zależne pliki jar

       mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
    

    Uwaga

  • Po pobraniu zależnych plików jar uruchom klienta SQL Flink, a te pliki jar zostaną zaimportowane do sesji. Ukończ polecenie w następujący sposób,

    /opt/flink-webssh/bin/sql-client.sh -j
    /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j
    /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j
    /opt/flink-webssh/target/hamcrest-2.1.jar -j
    /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j
    /opt/flink-webssh/target/awaitility-4.0.1.jar -j
    /opt/flink-webssh/target/jsr308-all-1.1.2.jar
    

    Te polecenia uruchamiają klienta SQL z zależnościami w następujący sposób:

    user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar 
    
                                      ????????
                                  ????????????????
                               ???????        ??????? ?
                             ????   ?????????      ?????
                             ???         ???????    ?????
                               ???            ???   ?????
                                 ??       ???????????????
                               ?? ?   ???      ?????? ?????
                               ?????   ????     ????? ?????
                            ???????       ???   ??????? ???
                      ????????? ??         ??   ??????????
                     ????????  ??          ?   ?? ???????
                   ????  ???           ?  ?? ???????? ?????
                  ???? ? ??          ? ?? ????????    ???? ??
                 ???? ????          ??????????       ??? ?? ????
              ???? ?? ???       ???????????         ???? ? ?  ???
              ??? ?? ??? ?????????             ????           ???
              ??   ? ???????             ????????          ??? ??
              ???    ???   ????????????????????           ????  ?
             ????? ???   ??????  ????????                 ????  ??
             ????????  ???????????????                            ??
             ?? ????   ??????? ???       ??????    ??         ???
             ??? ???  ??? ???????            ????   ?????????????
              ??? ?????  ???? ??                ??      ????  ???
              ??  ???   ?     ??                ??              ??
               ??  ??         ??                 ??        ????????
                ?? ?????       ??                  ???????????    ??
                 ??   ????     ?                    ???????      ??
                  ???   ?????                         ?? ???????????
                   ????    ????                     ??????? ????????
                     ?????                          ??  ???? ?????
                        ????????????????????????????????? ?????
    
       ______ _ _       _      _____  ____  _        _____ _ _            _  BETA   
      | ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | | 
      | |__ | |_ _ __ | | __ | (___ | |  | | |      | |    | |_ ___ _ __ | |_ 
      |  __| | | | '_ \| |/ /  \___ \| |  | | |     | |    | | |/ _ \ '_ \| __|
      | |   | | | | | |   <   ____) | |__| | |____  | |____| | | __/ | | | |_ 
      |_|   |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
    
           Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/xcao/.flink-sql-history
    
    Flink SQL>
    
  • Tworzenie tabeli CDC Flink PostgreSQL przy użyciu łącznika CDC

    CREATE TABLE shipments (
      shipment_id INT,
      order_id INT,
      origin STRING,
      destination STRING,
      is_arrived BOOLEAN,
      PRIMARY KEY (shipment_id) NOT ENFORCED
    ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'flinkpostgres.postgres.database.azure.com',
      'port' = '5432',
      'username' = 'username',
      'password' = 'password',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'shipments',
      'decoding.plugin.name' = 'pgoutput',
      'slot.name' = 'flink'
    );
    

Walidacja

  • Uruchom polecenie "select *", aby monitorować zmiany.

    select * from shipments;

    Zrzut ekranu przedstawiający sposób uruchamiania polecenia select-command.

Odwołanie