다음을 통해 공유


Apache Flink®를 사용하여 PostgreSQL 테이블의 CDC(변경 데이터 캡처)

중요하다

AKS의 Azure HDInsight는 2025년 1월 31일에 사용 중지되었습니다. 이 공지 를 통해에 대해 자세히 알아보세요.

워크로드가 갑자기 종료되는 것을 방지하기 위해 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 워크로드를 마이그레이션해야 합니다.

중요하다

이 기능은 현재 미리 보기로 제공됩니다. Microsoft Azure 미리 보기에 대한 추가 사용 약관에는 베타, 미리 보기 또는 아직 일반 공급으로 출시되지 않은 Azure 기능에 적용되는 더 많은 법적 조건이 포함되어 있습니다. 이 특정 미리 보기에 대한 자세한 내용은 Azure HDInsight on AKS 미리 보기 정보 을 참조하세요. 질문이나 기능 제안이 있으시면, AskHDInsight에 요청을 제출하고, 자세한 내용을 함께 기재해 주세요. 그리고 Azure HDInsight Community를 팔로우하셔서 더 많은 업데이트를 받아보세요.

CDC(변경 데이터 캡처)는 만들기, 업데이트 및 삭제 작업에 대한 응답으로 데이터베이스 테이블의 행 수준 변경 내용을 추적하는 데 사용할 수 있는 기술입니다. 이 문서에서는 Apache Flink®에 대한 원본 커넥터 집합을 제공하는 CDC Connectors for Apache Flink®을 사용합니다. 커넥터는 Debezium® 엔진으로 통합하여 데이터 변경 내용을 캡처합니다.

Flink는 Debezium JSON 및 Avro 메시지를 INSERT/UPDATE/DELETE 메시지로 Apache Flink SQL 시스템으로 해석하도록 지원합니다.

이 지원은 다음과 같은 경우에 유용합니다.

  • 데이터베이스에서 다른 시스템으로 증분 데이터 동기화
  • 감사 로그
  • 데이터베이스에서 실시간 구체화된 뷰 빌드
  • 데이터베이스 테이블의 임시 조인 변경 기록 보기

이제 Flink-SQL CDC를 사용하여 PostgreSQL 테이블의 변경 내용을 모니터링하는 방법을 알아보겠습니다. PostgreSQL CDC 커넥터를 사용하면 PostgreSQL 데이터베이스에서 스냅샷 데이터 및 증분 데이터를 읽을 수 있습니다.

필수 구성 요소

  • Azure PostgresSQL 유연한 서버 버전 14.7
  • AKS에서 HDInsight의 Apache Flink 클러스터
  • PostgreSQL 클라이언트를 사용하는 Linux 가상 머신
  • AKS 풀 서브넷의 HDInsight에서 포트 5432에서 인바운드 및 아웃바운드 연결을 허용하는 NSG 규칙을 추가합니다.

PostgreSQL 테이블 & 클라이언트용 준비

  • Linux 가상 머신을 사용하여 아래 명령을 사용하여 PostgreSQL 클라이언트 설치

    sudo apt-get update
    sudo apt-get install postgresql-client
    
  • SSL을 사용하여 PostgreSQL 서버에 연결하는 인증서 설치

    wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem

  • 서버에 연결(그에 따라 호스트, 사용자 이름 및 데이터베이스 이름 바꾸기)

    psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
    
  • 데이터베이스에 성공적으로 연결한 후 샘플 테이블을 만듭니다.

    CREATE TABLE shipments (
        shipment_id SERIAL NOT NULL PRIMARY KEY,
        order_id SERIAL NOT NULL,
        origin VARCHAR(255) NOT NULL,
        destination VARCHAR(255) NOT NULL,
        is_arrived BOOLEAN NOT NULL
      );
      ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
      ALTER TABLE public.shipments REPLICA IDENTITY FULL;
      INSERT INTO shipments
      VALUES (default,10001,'Beijing','Shanghai',false),
         (default,10002,'Hangzhou','Shanghai',false),
         (default,10003,'Shanghai','Hangzhou',false);
    
  • PostgreSQL 데이터베이스에서 CDC를 사용하도록 설정하려면 다음을 변경해야 합니다.

    • WAL 수준을 논리로 변경해야 합니다. 이 값은 Azure Portal의 서버 매개 변수 섹션에서 변경할 수 있습니다.

      PostgreSQL 데이터베이스에서 CDC를 활성화하는 방법을 보여주는 스크린샷

    • 테이블에 액세스하는 사용자에게 'REPLICATION' 역할이 추가되어야 합니다.

      사용자 <username>를 복제와 함께 변경;

  • Flink PostgreSQL CDC 테이블을 만들려면 모든 종속 jar을 다운로드합니다. 다음 내용과 함께 pom.xml 파일을 사용합니다.

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dep.download</groupId>
        <artifactId>dep-download</artifactId>
        <version>1.0-SNAPSHOT</version>
            <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
        <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-postgres-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        </dependencies>
    </project>
    
  • maven 명령을 사용하여 모든 종속 jar 파일을 다운로드하십시오.

       mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
    

    메모

  • 종속 jar이 다운로드되면 이러한 jar를 세션에 가져오기 위해 Flink SQL 클라이언트를 시작하십시오. 다음과 같이 명령 완료

    /opt/flink-webssh/bin/sql-client.sh -j
    /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j
    /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j
    /opt/flink-webssh/target/hamcrest-2.1.jar -j
    /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j
    /opt/flink-webssh/target/awaitility-4.0.1.jar -j
    /opt/flink-webssh/target/jsr308-all-1.1.2.jar
    

    이러한 명령은 종속성을 사용하여 sql 클라이언트를 다음과 같이 시작합니다.

    user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar 
    
                                      ????????
                                  ????????????????
                               ???????        ??????? ?
                             ????   ?????????      ?????
                             ???         ???????    ?????
                               ???            ???   ?????
                                 ??       ???????????????
                               ?? ?   ???      ?????? ?????
                               ?????   ????     ????? ?????
                            ???????       ???   ??????? ???
                      ????????? ??         ??   ??????????
                     ????????  ??          ?   ?? ???????
                   ????  ???           ?  ?? ???????? ?????
                  ???? ? ??          ? ?? ????????    ???? ??
                 ???? ????          ??????????       ??? ?? ????
              ???? ?? ???       ???????????         ???? ? ?  ???
              ??? ?? ??? ?????????             ????           ???
              ??   ? ???????             ????????          ??? ??
              ???    ???   ????????????????????           ????  ?
             ????? ???   ??????  ????????                 ????  ??
             ????????  ???????????????                            ??
             ?? ????   ??????? ???       ??????    ??         ???
             ??? ???  ??? ???????            ????   ?????????????
              ??? ?????  ???? ??                ??      ????  ???
              ??  ???   ?     ??                ??              ??
               ??  ??         ??                 ??        ????????
                ?? ?????       ??                  ???????????    ??
                 ??   ????     ?                    ???????      ??
                  ???   ?????                         ?? ???????????
                   ????    ????                     ??????? ????????
                     ?????                          ??  ???? ?????
                        ????????????????????????????????? ?????
    
       ______ _ _       _      _____  ____  _        _____ _ _            _  BETA   
      | ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | | 
      | |__ | |_ _ __ | | __ | (___ | |  | | |      | |    | |_ ___ _ __ | |_ 
      |  __| | | | '_ \| |/ /  \___ \| |  | | |     | |    | | |/ _ \ '_ \| __|
      | |   | | | | | |   <   ____) | |__| | |____  | |____| | | __/ | | | |_ 
      |_|   |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
    
           Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/xcao/.flink-sql-history
    
    Flink SQL>
    
  • CDC 커넥터를 사용하여 Flink PostgreSQL CDC 테이블 만들기

    CREATE TABLE shipments (
      shipment_id INT,
      order_id INT,
      origin STRING,
      destination STRING,
      is_arrived BOOLEAN,
      PRIMARY KEY (shipment_id) NOT ENFORCED
    ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'flinkpostgres.postgres.database.azure.com',
      'port' = '5432',
      'username' = 'username',
      'password' = 'password',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'shipments',
      'decoding.plugin.name' = 'pgoutput',
      'slot.name' = 'flink'
    );
    

유효성 검사

  • 'select *' 명령을 실행하여 변경 내용을 모니터링합니다.

    select * from shipments;

    실행 명령을 선택하는 방법을 보여주는 스크린샷.

참조