Sdílet prostřednictvím


Změna zachytávání dat (CDC) tabulky PostgreSQL pomocí Apache Flinku®

Důležitý

Azure HDInsight v AKS byl vyřazen 31. ledna 2025. Zjistěte více v tomto oznámení.

Abyste se vyhnuli náhlému ukončení úloh, musíte migrovat úlohy do Microsoft Fabric nebo ekvivalentního produktu Azure.

Důležitý

Tato funkce je aktuálně ve verzi Preview. doplňkové podmínky použití pro verze Preview Microsoft Azure obsahují další právní podmínky, které se vztahují na funkce Azure, které jsou v beta verzi, ve verzi Preview nebo ještě nebyly vydány v obecné dostupnosti. Informace o této konkrétní ukázce najdete v Azure HDInsight ve verzi preview ve službě AKS. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost na AskHDInsight s podrobnostmi a sledujte nás pro další aktualizace na komunitu Azure HDInsight .

Change Data Capture (CDC) je technika, kterou můžete použít ke sledování změn na úrovni řádků v databázových tabulkách v reakci na operace vytváření, aktualizace a odstraňování. V tomto článku používáme konektory CDC pro Apache Flink®, které nabízejí sadu zdrojových konektorů pro Apache Flink. Konektory integrují Debezium® jako stroj pro zachycení změn dat.

Flink podporuje interpretaci zpráv Debezium JSON a Avro jako insert/UPDATE/DELETE zpráv do systému Apache Flink SQL.

Tato podpora je užitečná v mnoha případech pro:

  • Synchronizace přírůstkových dat z databází do jiných systémů
  • Protokoly auditu
  • Vytváření materializovaných zobrazení v reálném čase v databázích
  • Zobrazení historie změn časového spojení databázové tabulky

Teď se dozvíme, jak monitorovat změny v tabulce PostgreSQL pomocí Flink-SQL CDC. Konektor PostgreSQL CDC umožňuje čtení dat snímků a přírůstkových dat z databáze PostgreSQL.

Požadavky

Příprava tabulky PostgreSQL & Client

  • Pomocí virtuálního počítače s Linuxem nainstalujte klienta PostgreSQL pomocí následujících příkazů.

    sudo apt-get update
    sudo apt-get install postgresql-client
    
  • Instalace certifikátu pro připojení k serveru PostgreSQL pomocí SSL

    wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem

  • Připojte se k serveru (odpovídajícím způsobem nahraďte hostitele, uživatelské jméno a název databáze).

    psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
    
  • Po úspěšném připojení k databázi vytvořte ukázkovou tabulku.

    CREATE TABLE shipments (
        shipment_id SERIAL NOT NULL PRIMARY KEY,
        order_id SERIAL NOT NULL,
        origin VARCHAR(255) NOT NULL,
        destination VARCHAR(255) NOT NULL,
        is_arrived BOOLEAN NOT NULL
      );
      ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
      ALTER TABLE public.shipments REPLICA IDENTITY FULL;
      INSERT INTO shipments
      VALUES (default,10001,'Beijing','Shanghai',false),
         (default,10002,'Hangzhou','Shanghai',false),
         (default,10003,'Shanghai','Hangzhou',false);
    
  • Pokud chcete povolit CDC v databázi PostgreSQL, musíte provést následující změny.

    • Úroveň WAL musí být změněna na logické. Tuto hodnotu je možné změnit v části parametry serveru na webu Azure Portal.

      snímek obrazovky znázorňující, jak povolit funkci CDC v databázi PostgreSQL

    • Uživatel, který přistupuje k tabulce, musí mít přidanou roli REPLIKACE.

      ALTER USER <username> S MOŽNOSTÍ REPLIKACE;

  • Pokud chcete vytvořit tabulku Flink PostgreSQL CDC, stáhněte všechny závislé soubory JAR. Použijte soubor pom.xml s následujícím obsahem.

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dep.download</groupId>
        <artifactId>dep-download</artifactId>
        <version>1.0-SNAPSHOT</version>
            <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
        <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        </dependencies>
    </project>
    
  • Použití příkazu maven ke stažení všech závislých souborů JAR

       mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
    

    Poznámka

  • Jakmile se závislé soubory JAR stáhnou, spustí se klient Flink SQLs těmito soubory JAR, které se mají importovat do relace. Dokončete příkaz následujícím způsobem:

    /opt/flink-webssh/bin/sql-client.sh -j
    /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j
    /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j
    /opt/flink-webssh/target/hamcrest-2.1.jar -j
    /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j
    /opt/flink-webssh/target/awaitility-4.0.1.jar -j
    /opt/flink-webssh/target/jsr308-all-1.1.2.jar
    

    Tyto příkazy spustí klienta SQL se závislostmi jako:

    user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar 
    
                                      ????????
                                  ????????????????
                               ???????        ??????? ?
                             ????   ?????????      ?????
                             ???         ???????    ?????
                               ???            ???   ?????
                                 ??       ???????????????
                               ?? ?   ???      ?????? ?????
                               ?????   ????     ????? ?????
                            ???????       ???   ??????? ???
                      ????????? ??         ??   ??????????
                     ????????  ??          ?   ?? ???????
                   ????  ???           ?  ?? ???????? ?????
                  ???? ? ??          ? ?? ????????    ???? ??
                 ???? ????          ??????????       ??? ?? ????
              ???? ?? ???       ???????????         ???? ? ?  ???
              ??? ?? ??? ?????????             ????           ???
              ??   ? ???????             ????????          ??? ??
              ???    ???   ????????????????????           ????  ?
             ????? ???   ??????  ????????                 ????  ??
             ????????  ???????????????                            ??
             ?? ????   ??????? ???       ??????    ??         ???
             ??? ???  ??? ???????            ????   ?????????????
              ??? ?????  ???? ??                ??      ????  ???
              ??  ???   ?     ??                ??              ??
               ??  ??         ??                 ??        ????????
                ?? ?????       ??                  ???????????    ??
                 ??   ????     ?                    ???????      ??
                  ???   ?????                         ?? ???????????
                   ????    ????                     ??????? ????????
                     ?????                          ??  ???? ?????
                        ????????????????????????????????? ?????
    
       ______ _ _       _      _____  ____  _        _____ _ _            _  BETA   
      | ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | | 
      | |__ | |_ _ __ | | __ | (___ | |  | | |      | |    | |_ ___ _ __ | |_ 
      |  __| | | | '_ \| |/ /  \___ \| |  | | |     | |    | | |/ _ \ '_ \| __|
      | |   | | | | | |   <   ____) | |__| | |____  | |____| | | __/ | | | |_ 
      |_|   |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
    
           Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/xcao/.flink-sql-history
    
    Flink SQL>
    
  • Vytvoření tabulky CDC Flink PostgreSQL pomocí konektoru CDC

    CREATE TABLE shipments (
      shipment_id INT,
      order_id INT,
      origin STRING,
      destination STRING,
      is_arrived BOOLEAN,
      PRIMARY KEY (shipment_id) NOT ENFORCED
    ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'flinkpostgres.postgres.database.azure.com',
      'port' = '5432',
      'username' = 'username',
      'password' = 'password',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'shipments',
      'decoding.plugin.name' = 'pgoutput',
      'slot.name' = 'flink'
    );
    

Validace

  • Spuštěním příkazu select *monitorujte změny.

    select * from shipments;

    snímek obrazovky znázorňující, jak spustit příkaz select-command

Odkaz