Dela via


Ändra datainsamling (CDC) i PostgreSQL-tabellen med Apache Flink®

Viktig

Azure HDInsight på AKS drogs tillbaka den 31 januari 2025. Läs mer genom det här meddelandet.

Du måste migrera dina arbetsbelastningar till Microsoft Fabric- eller en motsvarande Azure-produkt för att undvika plötsliga uppsägningar av dina arbetsbelastningar.

Viktig

Den här funktionen är för närvarande i förhandsversion. De kompletterande användningsvillkoren för Förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller på annat sätt ännu inte har släppts i allmän tillgänglighet. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS förhandsversionsinformation. För frågor eller funktionsförslag, vänligen skicka en begäran på AskHDInsight med detaljerna och följ oss för fler uppdateringar om Azure HDInsight Community.

Change Data Capture (CDC) är en teknik som du kan använda för att spåra ändringar på radnivå i databastabeller som svar på åtgärder för att skapa, uppdatera och ta bort. I den här artikeln använder vi CDC-kontaktpunkter för Apache Flink®, som erbjuder en uppsättning källkontaktpunkter för Apache Flink. Kopplingarna integrerar Debezium® som motor för att fånga upp dataändringarna.

Flink har stöd för att tolka Debezium JSON- och Avro-meddelanden som INSERT/UPDATE/DELETE-meddelanden i Apache Flink SQL-systemet.

Det här stödet är användbart i många fall för att:

  • Synkronisera inkrementella data från databaser till andra system
  • Granskningsloggar
  • Skapa materialiserade vyer i realtid på databaser
  • Visa ändringshistorik för temporal koppling för en databastabell

Nu ska vi lära oss hur du övervakar ändringar i PostgreSQL-tabellen med hjälp av Flink-SQL CDC. Med PostgreSQL CDC-anslutningsappen kan du läsa ögonblicksbildsdata och inkrementella data från PostgreSQL-databasen.

Förutsättningar

Förbered PostgreSQL-tabellen & Klient

  • Installera PostgreSQL-klienten med hjälp av kommandona nedan med hjälp av en virtuell Linux-dator

    sudo apt-get update
    sudo apt-get install postgresql-client
    
  • Installera certifikatet för att ansluta till PostgreSQL-servern med SSL

    wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem

  • Anslut till servern (ersätt värd, användarnamn och databasnamn i enlighet med detta)

    psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
    
  • När du har anslutit till databasen skapar du en exempeltabell

    CREATE TABLE shipments (
        shipment_id SERIAL NOT NULL PRIMARY KEY,
        order_id SERIAL NOT NULL,
        origin VARCHAR(255) NOT NULL,
        destination VARCHAR(255) NOT NULL,
        is_arrived BOOLEAN NOT NULL
      );
      ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
      ALTER TABLE public.shipments REPLICA IDENTITY FULL;
      INSERT INTO shipments
      VALUES (default,10001,'Beijing','Shanghai',false),
         (default,10002,'Hangzhou','Shanghai',false),
         (default,10003,'Shanghai','Hangzhou',false);
    
  • Om du vill aktivera CDC på PostgreSQL-databasen måste du göra följande ändringar.

    • WAL-nivån måste ändras till logiska. Det här värdet kan ändras i avsnittet serverparametrar på Azure-portalen.

      Skärmbild som visar hur du aktiverar-cdc-on-postgres-database.

    • Användare som kommer åt tabellen måste ha rollen "REPLIKERing" tillagd

      ÄNDRA ANVÄNDARE <username> MED REPLIKERING;

  • Om du vill skapa tabellen Flink PostgreSQL CDC laddar du ned alla beroende jar-filer. Använd filen pom.xml med följande innehåll.

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dep.download</groupId>
        <artifactId>dep-download</artifactId>
        <version>1.0-SNAPSHOT</version>
            <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
        <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        </dependencies>
    </project>
    
  • Använd maven-kommandot för att ladda ned alla beroende jar-filer

       mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
    

    Not

  • När de beroende jar-filerna har laddats ned startar Flink SQL-klienten, med dessa jar-filer som ska importeras till sessionen. Slutför kommandot enligt följande.

    /opt/flink-webssh/bin/sql-client.sh -j
    /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j
    /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j
    /opt/flink-webssh/target/hamcrest-2.1.jar -j
    /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j
    /opt/flink-webssh/target/awaitility-4.0.1.jar -j
    /opt/flink-webssh/target/jsr308-all-1.1.2.jar
    

    Dessa kommandon startar SQL-klienten med beroendena som,

    user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar 
    
                                      ????????
                                  ????????????????
                               ???????        ??????? ?
                             ????   ?????????      ?????
                             ???         ???????    ?????
                               ???            ???   ?????
                                 ??       ???????????????
                               ?? ?   ???      ?????? ?????
                               ?????   ????     ????? ?????
                            ???????       ???   ??????? ???
                      ????????? ??         ??   ??????????
                     ????????  ??          ?   ?? ???????
                   ????  ???           ?  ?? ???????? ?????
                  ???? ? ??          ? ?? ????????    ???? ??
                 ???? ????          ??????????       ??? ?? ????
              ???? ?? ???       ???????????         ???? ? ?  ???
              ??? ?? ??? ?????????             ????           ???
              ??   ? ???????             ????????          ??? ??
              ???    ???   ????????????????????           ????  ?
             ????? ???   ??????  ????????                 ????  ??
             ????????  ???????????????                            ??
             ?? ????   ??????? ???       ??????    ??         ???
             ??? ???  ??? ???????            ????   ?????????????
              ??? ?????  ???? ??                ??      ????  ???
              ??  ???   ?     ??                ??              ??
               ??  ??         ??                 ??        ????????
                ?? ?????       ??                  ???????????    ??
                 ??   ????     ?                    ???????      ??
                  ???   ?????                         ?? ???????????
                   ????    ????                     ??????? ????????
                     ?????                          ??  ???? ?????
                        ????????????????????????????????? ?????
    
       ______ _ _       _      _____  ____  _        _____ _ _            _  BETA   
      | ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | | 
      | |__ | |_ _ __ | | __ | (___ | |  | | |      | |    | |_ ___ _ __ | |_ 
      |  __| | | | '_ \| |/ /  \___ \| |  | | |     | |    | | |/ _ \ '_ \| __|
      | |   | | | | | |   <   ____) | |__| | |____  | |____| | | __/ | | | |_ 
      |_|   |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
    
           Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/xcao/.flink-sql-history
    
    Flink SQL>
    
  • Skapa en Flink PostgreSQL CDC-tabell med hjälp av CDC-anslutningsappen

    CREATE TABLE shipments (
      shipment_id INT,
      order_id INT,
      origin STRING,
      destination STRING,
      is_arrived BOOLEAN,
      PRIMARY KEY (shipment_id) NOT ENFORCED
    ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'flinkpostgres.postgres.database.azure.com',
      'port' = '5432',
      'username' = 'username',
      'password' = 'password',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'shipments',
      'decoding.plugin.name' = 'pgoutput',
      'slot.name' = 'flink'
    );
    

Validering

  • Kör kommandot "select *" för att övervaka ändringarna.

    select * from shipments;

    Skärmbild som visar hur du kör-välj-kommando.

Hänvisning