CDC(변경 데이터 캡처)를 사용하여 Azure SQL Managed Instance에서 Azure Storage로 데이터 증분 로드
적용 대상: Azure Data Factory Azure Synapse Analytics
팁
기업용 올인원 분석 솔루션인 Microsoft Fabric의 Data Factory를 사용해 보세요. Microsoft Fabric은 데이터 이동부터 데이터 과학, 실시간 분석, 비즈니스 인텔리전스 및 보고에 이르기까지 모든 것을 다룹니다. 무료로 새 평가판을 시작하는 방법을 알아봅니다!
이 자습서에서는 원본 Azure SQL Managed Instance 데이터베이스의 CDC(변경 데이터 캡처) 정보를 기반으로 Azure Blob Storage에 델타 데이터를 로드하는 파이프라인이 있는 Azure 데이터 팩터리를 만듭니다.
이 자습서에서 수행하는 단계는 다음과 같습니다.
- 원본 데이터 저장소를 준비합니다.
- 데이터 팩터리를 만듭니다.
- 연결된 서비스만들기.
- 원본 및 싱크 데이터 세트를 만듭니다.
- 파이프라인을 만들고 디버그하고 실행하여 변경된 데이터를 확인합니다.
- 원본 테이블의 데이터를 수정합니다.
- 증분 복사 파이프라인을 생성, 실행 및 모니터링합니다.
개요
Azure SQL MI(Managed Instances) 및 SQL Server 같은 데이터 저장소에서 지원되는 변경 데이터 캡처 기술은 변경된 데이터를 식별하는 데 사용할 수 있습니다. 이 자습서는 SQL 변경 데이터 캡처 기술을 통해 Azure Data Factory를 사용하여 Azure SQL Managed Instance에서 Azure Blob Storage로 델타 데이터를 증분 로드하는 방법을 설명합니다. SQL 변경 데이터 캡처 기술에 대한 자세한 내용은 SQL Server의 변경 데이터 캡처를 참조하세요.
엔드투엔드 워크플로
다음은 변경 데이터 캡처 기술을 사용하여 데이터를 증분 로드하는 일반적인 엔드투엔드 워크플로 단계입니다.
참고 항목
Azure SQL MI 및 SQL Server는 모두 변경 데이터 캡처 기술을 지원합니다. 이 자습서는 Azure SQL Managed Instance를 원본 데이터 저장소로 사용합니다. 온-프레미스 SQL Server를 사용할 수도 있습니다.
대략적인 솔루션
이 자습서에서는 다음 작업을 수행하는 파이프라인을 만듭니다.
- SQL Database CDC 테이블에서 변경된 레코드 수를 계산하고 IF 조건 작업에 전달하는 조회 작업을 만듭니다.
- 변경된 레코드가 있는지 여부를 확인하기 위해 If 조건을 만들고, 해당하는 경우 복사 작업을 호출합니다.
- 복사 작업 만들어서 CDC 테이블 사이의 삽입/업데이트/삭제된 데이터를 Azure Blob Storage에 복사합니다.
Azure 구독이 아직 없는 경우 시작하기 전에 체험 계정을 만듭니다.
사전 요구 사항
- Azure SQL Managed Instance. 데이터베이스를 원본 데이터 저장소로 사용합니다. Azure SQL Managed Instance가 없는 경우 Azure SQL Database Managed Instance 만들기 문서를 참조하세요.
- Azure Storage 계정입니다. Blob Storage를 싱크 데이터 스토리지로 사용합니다. 아직 없는 경우 Azure Storage 계정을 만드는 단계는 스토리지 계정 만들기 문서를 참조하세요. 원시라는 컨테이너를 만들려면:
Azure SQL Database에 데이터 원본 테이블 만들기
SQL Server Management Studio를 시작하고 Azure SQL Managed Instances 서버에 연결합니다.
서버 탐색기에서 데이터베이스를 마우스 오른쪽 단추로 클릭하고 새 쿼리를 선택합니다.
Azure SQL Managed Instances 데이터베이스에 대해 다음 SQL 명령을 실행하여 데이터 원본 저장소로
customers
이라는 테이블을 만듭니다.create table customers ( customer_id int, first_name varchar(50), last_name varchar(50), email varchar(100), city varchar(50), CONSTRAINT "PK_Customers" PRIMARY KEY CLUSTERED ("customer_id") );
다음 SQL 쿼리를 실행하여 데이터베이스 및 원본 테이블(고객)에서 변경 데이터 캡처 메커니즘을 사용하도록 설정합니다.
참고 항목
- <원본 스키마 이름>을 고객 테이블이 있는 Azure SQL MI의 스키마로 바꿉니다.
- 변경 데이터 캡처는 추적 중인 테이블을 변경하는 트랜잭션의 일부로써 어떤 작업도 수행하지 않습니다. 대신 삽입, 업데이트 및 삭제 작업이 트랜잭션 로그에 기록됩니다. 변경 테이블에 보관되어 있는 데이터는 주기적이며 체계적으로 정리하지 않으면 관리할 수 없을 정도로 커집니다. 자세한 내용은 데이터베이스에 대한 변경 데이터 캡처 사용을 참조하세요.
EXEC sys.sp_cdc_enable_db EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 1
다음 명령을 실행하여 고객 테이블에 데이터를 삽입합니다.
insert into customers (customer_id, first_name, last_name, email, city) values (1, 'Chevy', 'Leward', 'cleward0@mapy.cz', 'Reading'), (2, 'Sayre', 'Ateggart', 'sateggart1@nih.gov', 'Portsmouth'), (3, 'Nathalia', 'Seckom', 'nseckom2@blogger.com', 'Portsmouth');
참고 항목
변경 데이터 캡처를 사용하도록 설정하기 전에는 테이블에 대한 기록 변경 내용이 캡처되지 않습니다.
데이터 팩터리 만들기
사용할 데이터 팩터리가 아직 없는 경우 빠른 시작: Azure Portal을 사용하여 데이터 팩터리 만들기문서의 단계를 수행합니다.
연결된 서비스 생성
데이터 팩터리에서 연결된 서비스를 만들어 데이터 저장소를 연결하고 컴퓨팅 서비스를 데이터 팩터리에 연결합니다. 이 섹션에서는 Azure Storage 계정과 Azure SQL MI에 연결된 서비스를 만듭니다.
Azure Storage 연결된 서비스를 만듭니다.
이 단계에서는 Azure Storage 계정을 데이터 팩터리에 연결합니다.
연결, + 새로 만들기를 차례로 클릭합니다.
새 연결된 서비스 창에서 Azure Blob Storage를 선택하고 계속을 클릭합니다.
새 연결된 서비스 창에서 다음 단계를 수행합니다.
- 이름에 대해 AzureStorageLinkedService를 입력합니다.
- 스토리지 계정 이름에 대해 Azure Storage 계정을 선택합니다.
- 저장을 클릭합니다.
Azure SQL MI Database 연결 서비스를 만듭니다.
이 단계에서는 Azure SQL MI 데이터베이스를 데이터 팩터리에 연결합니다.
참고 항목
SQL MI를 사용하는 경우 공용 vs 프라이빗 엔드포인트를 통해 액세스하는 방법에 대한 자세한 내용은 여기를 참조하세요. 프라이빗 엔드포인트를 사용하는 경우 자체 호스팅 통합 런타임을 사용하여 이 파이프라인을 실행해야 합니다. VM 또는 VNet 시나리오에서 온-프레미스를 실행하는 SQL Server에도 동일하게 적용됩니다.
연결, + 새로 만들기를 차례로 클릭합니다.
새 연결된 서비스 창에서 Azure SQL Database Managed Instance를 선택하고 계속을 클릭합니다.
새 연결된 서비스 창에서 다음 단계를 수행합니다.
- 이름 필드에 AzureSqlMI1을 입력합니다.
- 서버 이름 필드에 대한 SQL 서버를 선택합니다.
- 데이터베이스 이름 필드에 대한 SQL 데이터베이스를 선택합니다.
- 사용자 이름 필드에 대해 사용자의 이름을 입력합니다.
- 암호 필드에 대해 사용자의 암호를 입력합니다.
- 연결 테스트를 클릭하여 연결을 테스트합니다.
- 저장을 클릭하여 연결된 서비스를 저장합니다.
데이터 세트 생성
이 단계에서는 데이터 원본 및 데이터 대상을 나타내는 데이터 세트를 만듭니다.
원본 데이터를 나타내는 데이터 세트 만들기
이 단계에서는 원본 데이터를 나타내는 데이터 세트를 만듭니다.
트리 뷰에서 +(더하기), 데이터 세트를 차례로 클릭합니다.
Azure SQL Database Managed Instance를 선택하고 계속을 클릭합니다.
Set 속성 탭에서 데이터 세트 이름 및 연결 정보를 설정합니다.
- 연결된 서비스에 대해 AzureSqlMI1를 선택합니다.
- 테이블 이름에 대한 [dbo].[dbo_customers_CT]를 선택합니다. 참고: CDC를 고객 테이블에서 실행할 때 이 테이블이 자동으로 생성되었습니다. 변경된 데이터는 이 테이블에서 직접 쿼리하지 않지만CDC 함수를 통해 추출됩니다.
싱크 데이터 저장소에 복사된 데이터를 나타내는 데이터 세트 만들기
이 단계에서는 원본 데이터 저장소에서 복사된 데이터를 나타내는 데이터 세트를 만듭니다. 필수 구성 요소의 일부로써 데이터 레이크 컨테이너를 Azure Blob Storage에 만들었습니다. 아직 없는 경우 컨테이너를 만들거나 기존 컨테이너의 이름으로 설정합니다. 이 자습서에서 출력 파일 이름은 나중에 구성될 트리거 시간을 사용하여 동적으로 생성됩니다.
트리 뷰에서 +(더하기), 데이터 세트를 차례로 클릭합니다.
Azure Blob Storage를 선택하고 선택을 클릭합니다.
DelimitedText를 선택하고 계속을 클릭합니다.
Set 속성 탭에서 데이터 세트 이름 및 연결 정보를 설정합니다.
- 연결된 서비스에 대해 AzureStorageLinkedService를 선택합니다.
- filePath의 컨테이너 부분에 원시를 입력합니다.
- 첫 번째 행을 헤더로 사용하십시오.
- 확인을 클릭합니다.
변경된 데이터를 복사하는 파이프라인 만들기
이 단계에서는 조회 작업을 사용하여 변경 테이블에 있는 변경된 레코드의 수를 먼저 확인하는 파이프라인을 만듭니다. IF 조건 작업은 변경된 레코드 수가 0보다 큰지 여부를 확인하고 복사 작업을 실행하여 Azure SQL Database에서 삽입/업데이트/삭제된 데이터를 Azure Blob Storage로 복사합니다. 마지막으로, 연속 창 트리거가 구성되고 시작 및 종료 시간이 작업에 시작 및 종료 창 매개 변수로 전달됩니다.
Data Factory UI에서 편집 탭으로 전환합니다. 왼쪽 창에서 +(더하기), 파이프라인을 차례로 클릭합니다.
파이프라인을 구성하기 위한 새 탭이 표시됩니다. 또한 트리 뷰에도 파이프라인이 표시됩니다. 속성 창에서 파이프라인 이름을 IncrementalCopyPipeline으로 변경합니다.
활동 도구 상자에서 일반을 펼치고, 조회 활동을 파이프라인 디자이너 화면으로 끌어서 놓습니다. 활동 이름을 GetChangeCount로 설정합니다. 이 활동은 지정된 시간 창에 대한 변경 테이블의 레코드 수를 가져옵니다.
속성 창의 설정으로 전환합니다.
원본 데이터 세트 필드에 대해 SQL MI 데이터 세트 이름을 지정합니다.
쿼리 옵션을 선택하고 쿼리 상자에 다음을 입력합니다.
DECLARE @from_lsn binary(10), @to_lsn binary(10); SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE()); SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
- 첫 행만 사용
데이터 미리 보기 단추를 클릭하여 조회 작업에서 유효한 출력을 가져왔는지 확인합니다.
활동 도구 상자에서 반복 및 조건부를 펼치고, If 조건부 작업을 파이프라인 디자이너 화면으로 끌어서 놓습니다. 작업 이름을 HasChangedRows로 설정합니다.
속성 창에서 작업으로 전환합니다.
- 다음 식을 입력합니다.
@greater(int(activity('GetChangeCount').output.firstRow.changecount),0)
- 연필 아이콘을 클릭하여 True 조건을 편집합니다.
- 활동 도구 상자에서 일반을 펼치고, 대기 활동을 파이프라인 디자이너 화면으로 끌어서 놓습니다. 이 작업은 If 조건을 디버깅하기 위한 임시 작업이며 자습서의 뒷부분에서 변경됩니다.
- IncrementalCopyPipeline 이동 경로를 클릭하여 주 파이프라인으로 돌아갑니다.
디버그 모드에서 파이프라인을 실행하여 파이프라인이 성공적으로 실행되는지 확인합니다.
그런 다음 True 조건 단계로 돌아가서 대기 작업을 삭제합니다. 활동 도구 상자에서 이동 & 전환을 펼치고, 복사 활동을 파이프라인 디자이너 화면으로 끌어서 놓습니다. 활동 이름을 IncrementalCopyActivity로 설정합니다.
속성 창에서 원본으로 전환하고 다음 단계를 수행합니다.
원본 데이터 세트 필드에 대해 SQL MI 데이터 세트 이름을 지정합니다.
쿼리 사용에 대해 쿼리를 선택합니다.
쿼리에 다음을 입력합니다.
DECLARE @from_lsn binary(10), @to_lsn binary(10); SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE()); SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
미리 보기를 클릭하여 쿼리가 변경된 행을 올바르게 반환하는지 확인합니다.
싱크 탭으로 전환하고 싱크 데이터 세트 필드의 Azure Storage 데이터 세트를 지정합니다.
주 파이프라인 캔버스로 돌아가기를 클릭하고 조회 활동을 If 조건 활동에 하나씩 연결합니다. 조회 활동에 연결된 녹색 단추를 If 조건 활동으로 끕니다.
도구 모음에서 유효성 검사를 클릭합니다. 유효성 검사 오류가 없는지 확인합니다. >>를 클릭하여 파이프라인 유효성 검사 보고서 창을 닫습니다.
디버그를 클릭하여 파이프라인을 테스트하고 파일이 스토리지 위치에 생성되었는지 확인합니다.
모두 게시 단추를 클릭하여 엔터티(연결된 서비스, 데이터 세트 및 파이프라인)를 Data Factory 서비스에 게시합니다. 게시 성공 메시지가 표시될 때까지 기다립니다.
연속 창 트리거 및 CDC 창 매개 변수를 구성합니다.
이 단계에서는 빈번한 일정에 따라 작업을 실행하는 연속 창 트리거를 만듭니다. 연속 창 트리거의 WindowStart 및 WindowEnd 시스템 변수를 사용하여 CDC 쿼리에 사용할 파이프라인에 매개 변수로 전달합니다.
IncrementalCopyPipeline 파이프라인의 매개 변수 탭으로 이동하고 + 새 단추를 사용하여 파이프라인에 두 개의 매개 변수(triggerStartTime 및 triggerEndTime)를 추가합니다. 이는 연속 창 시작 및 종료 시간을 나타냅니다. 디버깅을 위해 YYYY-MM-DD HH24:MI:SS.FFF 형식의 기본값을 추가합니다. 하지만 테이블에서 triggerStartTime이 CDC를 사용하도록 설정되어 있지 않은지 확인합니다. 그렇지 않으면 오류가 발생합니다.
조회 작업의 설정 탭을 클릭하고 시작 및 종료 매개 변수를 사용하도록 쿼리를 구성합니다. 다음을 쿼리에 복사합니다.
@concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = ''',pipeline().parameters.triggerStartTime,'''; SET @end_time = ''',pipeline().parameters.triggerEndTime,'''; SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time); SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time); SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
If 조건 활동이 True이면 복사 작업으로 이동하고 원본 탭을 클릭합니다. 다음을 쿼리에 복사합니다.
@concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = ''',pipeline().parameters.triggerStartTime,'''; SET @end_time = ''',pipeline().parameters.triggerEndTime,'''; SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time); SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time); SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
복사 작업의 싱크 탭을 클릭하고 열기를 클릭하여 데이터 세트 속성을 편집합니다. 매개 변수 탭을 클릭하고 triggerStart 라는 새 매개 변수를 추가합니다.
그런 다음 날짜 기반 파티션이 있는 고객/증분 하위 디렉터리에 데이터를 저장하도록 데이터 세트 속성을 구성합니다.
데이터 세트 속성의 연결 탭을 클릭하고 디렉터리 및 파일 섹션 모두에 동적 콘텐츠를 추가합니다.
텍스트 상자 아래의 동적 콘텐츠 링크를 클릭하여 디렉터리 섹션에 다음 식을 입력합니다.
@concat('customers/incremental/',formatDateTime(dataset().triggerStart,'yyyy/MM/dd'))
파일 섹션에 다음 식을 입력합니다. 이렇게 하면 트리거 시작 날짜와 시간을 기준으로 파일 이름이 생성되고 그 뒤에 csv 확장명이 붙습니다.
@concat(formatDateTime(dataset().triggerStart,'yyyyMMddHHmmssfff'),'.csv')
IncrementalCopyPipeline 탭을 클릭하여복사 작업의 싱크 설정으로 다시 이동합니다.
데이터 세트 속성을 확장하고 다음 식을 사용하여 triggerStart 매개 변수 값에 동적 콘텐츠를 입력합니다.
@pipeline().parameters.triggerStartTime
디버그를 클릭하여 파이프라인을 테스트하고 폴더 구조와 출력 파일이 예상대로 생성되는지 확인합니다. 콘텐츠를 확인하려면 파일을 다운로드하여 엽니다.
파이프라인 실행의 입력 매개 변수를 검토하여 매개 변수가 쿼리에 삽입되는지 확인합니다.
모두 게시 단추를 클릭하여 엔터티(연결된 서비스, 데이터 세트 및 파이프라인)를 Data Factory 서비스에 게시합니다. 게시 성공 메시지가 표시될 때까지 기다립니다.
마지막으로 연속 창 트리거를 구성하여 일정한 간격으로 파이프라인을 실행하고 시작 및 종료 시간 매개 변수를 설정합니다.
- 트리거 추가 단추를 클릭하고 새로 만들기/편집을 선택합니다.
- 트리거 이름을 입력하고 시작 시간을 지정합니다. 이 시간은 위의 디버그 창 종료 시간과 같습니다.
다음 화면에서 시작 및 종료 매개 변수에 대한 다음 값을 각각 지정합니다.
@formatDateTime(trigger().outputs.windowStartTime,'yyyy-MM-dd HH:mm:ss.fff') @formatDateTime(trigger().outputs.windowEndTime,'yyyy-MM-dd HH:mm:ss.fff')
참고 항목
트리거는 게시된 후에만 실행됩니다. 또한 연속 창의 예상 동작은 시작 날짜부터 지금까지 모든 기록 간격을 실행하는 것입니다. 연속 창 트리거에 대한 자세한 내용은 여기에서 찾을 수 있습니다.
SQL Server Management Studio를 사용하여 다음 SQL을 실행하면 고객 테이블을 약간 추가로 변경할 수 있습니다.
insert into customers (customer_id, first_name, last_name, email, city) values (4, 'Farlie', 'Hadigate', 'fhadigate3@zdnet.com', 'Reading'); insert into customers (customer_id, first_name, last_name, email, city) values (5, 'Anet', 'MacColm', 'amaccolm4@yellowbook.com', 'Portsmouth'); insert into customers (customer_id, first_name, last_name, email, city) values (6, 'Elonore', 'Bearham', 'ebearham5@ebay.co.uk', 'Portsmouth'); update customers set first_name='Elon' where customer_id=6; delete from customers where customer_id=5;
모두 게시 단추를 클릭합니다. 게시 성공 메시지가 표시될 때까지 기다립니다.
몇 분 후 파이프라인이 트리거되고 새 파일이 Azure Storage에 로드됩니다.
증분 복사 파이프라인 모니터링
왼쪽의 모니터 탭을 클릭합니다. 목록에 파이프라인 실행 및 해당 상태가 표시됩니다. 목록을 새로 고치려면 새로 고침을 클릭합니다. 파이프라인 이름 근처를 가리켜 다시 실행 작업 및 소비 보고서에 액세스합니다.
파이프라인 실행과 연결된 활동 실행을 보려면 파이프라인 이름을 클릭합니다. 변경된 데이터가 검색되면 복사 작업을 포함하는 세 가지 작업이 있습니다. 그렇지 않으면 목록에 두 개의 항목만 있습니다. 파이프라인 실행 보기로 다시 전환하려면 위쪽의 모든 파이프라인 링크를 클릭합니다.
결과 검토
raw
컨테이너의 customers/incremental/YYYY/MM/DD
폴더에 두 번째 파일이 표시됩니다.
관련 콘텐츠
다음 자습서로 넘어가서 해당 LastModifiedDate를 기준으로 새로운 파일과 변경된 파일을 복사하는 방법에 대해 알아보세요.