Apache Flink®를 사용하여 PostgreSQL 테이블의 CDC(변경 데이터 캡처)
중요하다
AKS의 Azure HDInsight는 2025년 1월 31일에 사용 중지되었습니다. 이 공지 를 통해에 대해 자세히 알아보세요.
워크로드가 갑자기 종료되는 것을 방지하기 위해 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 워크로드를 마이그레이션해야 합니다.
중요하다
이 기능은 현재 미리 보기로 제공됩니다. Microsoft Azure 미리 보기에 대한 추가 사용 약관에는 베타, 미리 보기 또는 아직 일반 공급으로 출시되지 않은 Azure 기능에 적용되는 더 많은 법적 조건이 포함되어 있습니다. 이 특정 미리 보기에 대한 자세한 내용은 Azure HDInsight on AKS 미리 보기 정보 을 참조하세요. 질문이나 기능 제안이 있으시면, AskHDInsight에 요청을 제출하고, 자세한 내용을 함께 기재해 주세요. 그리고 Azure HDInsight Community를 팔로우하셔서 더 많은 업데이트를 받아보세요.
CDC(변경 데이터 캡처)는 만들기, 업데이트 및 삭제 작업에 대한 응답으로 데이터베이스 테이블의 행 수준 변경 내용을 추적하는 데 사용할 수 있는 기술입니다. 이 문서에서는 Apache Flink®에 대한 원본 커넥터 집합을 제공하는 CDC Connectors for Apache Flink®을 사용합니다. 커넥터는 Debezium® 엔진으로 통합하여 데이터 변경 내용을 캡처합니다.
Flink는 Debezium JSON 및 Avro 메시지를 INSERT/UPDATE/DELETE 메시지로 Apache Flink SQL 시스템으로 해석하도록 지원합니다.
이 지원은 다음과 같은 경우에 유용합니다.
- 데이터베이스에서 다른 시스템으로 증분 데이터 동기화
- 감사 로그
- 데이터베이스에서 실시간 구체화된 뷰 빌드
- 데이터베이스 테이블의 임시 조인 변경 기록 보기
이제 Flink-SQL CDC를 사용하여 PostgreSQL 테이블의 변경 내용을 모니터링하는 방법을 알아보겠습니다. PostgreSQL CDC 커넥터를 사용하면 PostgreSQL 데이터베이스에서 스냅샷 데이터 및 증분 데이터를 읽을 수 있습니다.
필수 구성 요소
- Azure PostgresSQL 유연한 서버 버전 14.7
- AKS에서 HDInsight의 Apache Flink 클러스터
- PostgreSQL 클라이언트를 사용하는 Linux 가상 머신
- AKS 풀 서브넷의 HDInsight에서 포트 5432에서 인바운드 및 아웃바운드 연결을 허용하는 NSG 규칙을 추가합니다.
PostgreSQL 테이블 & 클라이언트용 준비
Linux 가상 머신을 사용하여 아래 명령을 사용하여 PostgreSQL 클라이언트 설치
sudo apt-get update sudo apt-get install postgresql-client
SSL을 사용하여 PostgreSQL 서버에 연결하는 인증서 설치
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
서버에 연결(그에 따라 호스트, 사용자 이름 및 데이터베이스 이름 바꾸기)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
데이터베이스에 성공적으로 연결한 후 샘플 테이블을 만듭니다.
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
PostgreSQL 데이터베이스에서 CDC를 사용하도록 설정하려면 다음을 변경해야 합니다.
Apache Flink PostgreSQL CDC 테이블 만들기
Flink PostgreSQL CDC 테이블을 만들려면 모든 종속 jar을 다운로드합니다. 다음 내용과 함께
pom.xml
파일을 사용합니다.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
maven 명령을 사용하여 모든 종속 jar 파일을 다운로드하십시오.
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
메모
- 웹 ssh Pod에 maven이 포함되어 있지 않으면 링크를 따라 다운로드하여 설치하세요.
- jsr jar 파일을 다운로드하려면 다음 명령을 사용합니다.
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
종속 jar이 다운로드되면 이러한 jar를 세션에 가져오기 위해 Flink SQL 클라이언트를 시작하십시오. 다음과 같이 명령 완료
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
이러한 명령은 종속성을 사용하여 sql 클라이언트를 다음과 같이 시작합니다.
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
CDC 커넥터를 사용하여 Flink PostgreSQL CDC 테이블 만들기
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
유효성 검사
참조
- Apache Flink 웹사이트
- PostgreSQL CDC ConnectorApache 2.0 라이선스 따라 라이선스가 부여됩니다.
- Apache, Apache Flink, Flink 및 관련 오픈 소스 프로젝트 이름은 ASF(Apache Software Foundation)의 상표입니다.