다음을 통해 공유


Data Warehouse에서 Lakehouse로 데이터 증분 로드

이 자습서에서는 데이터 웨어하우스에서 Lakehouse로 데이터를 증분 로드하는 방법을 알아봅니다.

개요

상위 수준 솔루션 다이어그램은 다음과 같습니다.

증분 로드 데이터 로직을 보여주는 다이어그램.

이 솔루션을 만드는 중요한 단계는 다음과 같습니다.

  1. 워터마크 열을 선택합니다. 원본 데이터 테이블에서 하나의 열을 선택합니다. 이 열은 모든 실행에 대해 새 레코드 또는 업데이트된 레코드를 분할하는 데 사용할 수 있습니다. 선택한 이 열의 데이터(예: last_modify_time 또는 ID)는 일반적으로 행을 만들거나 업데이트할 때 계속 증가합니다. 이 열의 최대 값은 워터마크로 사용됩니다.

  2. Data Warehouse에 마지막 워터마크 값을 저장할 테이블을 준비합니다.

  3. 다음 워크플로를 사용하여 파이프라인을 만듭니다.

    이 솔루션의 파이프라인에 포함되는 작업은 다음과 같습니다.

    • 두 가지 조회 작업을 만듭니다. 첫 번째 조회 작업을 사용하여 마지막 워터마크 값을 검색합니다. 두 번째 조회 작업을 사용하여 새 워터마크 값을 검색합니다. 이러한 워터마크 값은 복사 작업에 전달됩니다.
    • 이전 워터마크 값보다 크고, 새 워터마크 값보다 작은 워터마크 열 값으로 원본 데이터 저장소의 행을 복사하는 복사 작업을 만듭니다. 그런 다음 Data Warehouse에서 Lakehouse로 데이터를 새 파일로 복사합니다.
    • 다음에 실행되는 파이프라인에 대한 워터마크 값을 마지막으로 업데이트하는 저장 프로시저 활동을 만듭니다.

필수 조건

  • Data Warehouse. Data Warehouse를 원본 데이터 저장소로 사용합니다. 없는 경우 만드는 단계에 대한 Data Warehouse 만들기를 참조하세요.
  • Lakehouse. 레이크하우스를 대상 데이터 저장소로 사용합니다. 없는 경우 만드는 단계에 대한 Lakehouse 만들기를 참조하세요. IncrementalCopy라는 이름의 폴더를 만들어 복사한 데이터를 저장합니다.

원본 서버 준비

다음 사항은 증분 복사 파이프라인을 구성하기 전에 원본 Data Warehouse에서 준비해야 하는 몇 가지 테이블 및 저장 프로시저입니다.

1. Data Warehouse에 데이터 원본 테이블 만들기

Data Warehouse에서 다음 SQL 명령을 실행하여 데이터 원본 테이블로 data_source_table이라는 테이블을 만듭니다. 이 자습서에서는 증분 복사를 수행하는 샘플 데이터로 사용합니다.

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

데이터 원본 테이블의 데이터는 다음과 같이 표시됩니다.

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

이 자습서에서는 LastModifytime을 워터마크 열로 사용합니다.

2. Data Warehouse에 마지막 워터마크 값을 저장할 또 다른 테이블 만들기

  1. Data Warehouse에서 다음 SQL 명령을 실행하여 마지막 워터마크 값을 저장할 워터마크테이블이라는 테이블을 만듭니다.

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. 원본 데이터 테이블의 테이블 이름으로 마지막 워터마크의 기본 값을 설정합니다. 이 자습서에서 테이블 이름은 data_source_table이며 기본값은 1/1/2010 12:00:00 AM입니다.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. 테이블 워터마크 테이블의 데이터를 검토합니다.

    Select * from watermarktable
    

    출력:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. Data Warehouse에 저장 프로시저 만들기

다음 명령을 실행하여 Data Warehouse에 저장 프로시저를 만듭니다. 이 저장 프로시저는 마지막 파이프라인 실행 후 마지막 워터마크 값을 업데이트하는 데 사용됩니다.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

증분 복사를 위한 파이프라인 구성

1단계: 파이프라인 만들기

  1. Power BI로 이동합니다.

  2. 화면 왼쪽 아래에서 Power BI 아이콘을 선택한 다음, Data Factory를 선택하여 Data Factory의 홈 페이지를 엽니다.

  3. Microsoft Fabric 작업 영역으로 이동합니다.

  4. 데이터 파이프라인을 선택한 다음 파이프라인 이름을 입력하여 새 파이프라인을 만듭니다.

    새로 만든 작업 영역의 새 데이터 파이프라인 버튼을 보여 주는 스크린샷.

    새 파이프라인을 만드는 이름을 보여주는 스크린샷.

2단계: 마지막 워터마크에 대한 조회 작업 추가

이 단계에서는 조회 작업을 만들어 마지막 워터마크 값을 가져옵니다. 이전에 설정한 1/1/2010 12:00:00 AM 기본값을 가져옵니다.

  1. 파이프라인 작업 추가를 선택하고 드롭다운 목록에서 조회를 선택합니다.

  2. 일반 탭에서 이 작업의 이름을 LookupOldWaterMarkActivity로 변경합니다.

  3. 설정 탭에서 다음 구성을 수행합니다.

    • 데이터 저장소 유형: 작업 영역을 선택합니다.
    • 작업 영역 데이터 저장소 유형: Data Warehouse를 선택합니다.
    • Data Warehouse: Data Warehouse를 선택합니다.
    • 쿼리 사용: 테이블을 선택합니다.
    • 테이블: dbo.watermarktable을 선택합니다.
    • 첫 번째 행만 해당: 선택되었습니다.

    이전 워터마크를 조회하는 스크린샷.

3단계: 새 워터마크에 대한 조회 작업 추가

이 단계에서는 새 워터마크 값을 가져오는 조회 작업을 만듭니다. 쿼리를 사용하여 원본 데이터 테이블에서 새 워터마크를 가져옵니다. data_source_tableLastModifytime 열에서 최대값이 산출됩니다.

  1. 상단 바의 작업 탭에서 조회를 선택하여 두 번째 조회 작업을 추가합니다.

  2. 일반 탭에서 이 작업의 이름을 LookupNewWaterMarkActivity로 변경합니다.

  3. 설정 탭에서 다음 구성을 수행합니다.

    • 데이터 저장소 유형: 작업 영역을 선택합니다.

    • 작업 영역 데이터 저장소 유형: Data Warehouse를 선택합니다.

    • Data Warehouse: Data Warehouse를 선택합니다.

    • 쿼리 사용: 쿼리를 선택합니다.

    • 쿼리: 다음 쿼리를 입력하여 마지막으로 수정한 최대 시간을 새 워터마크로 선택합니다.

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • 첫 번째 행만 해당: 선택되었습니다.

    새 워터마크를 조회하는 스크린샷.

4단계: 증분 데이터를 복사하는 복사 작업 추가

이 단계에서는 Data Warehouse에서 Lakehouse로 마지막 워터마크와 새 워터마크 사이에 증분 데이터를 복사하는 복사 작업을 추가합니다.

  1. 위쪽 표시줄에서 활동을 선택하고 데이터 복사 -캔버스에 추가를 선택하여 >복사 작업을 가져옵니다.

  2. 일반 탭에서 이 작업의 이름을 IncrementalCopyActivity로 변경합니다.

  3. 조회 작업에 연결되는 녹색 단추(성공 위)를 복사 작업으로 끌어서 두 조회 작업을 복사 작업에 연결합니다. 복사 활동의 테두리 색이 녹색으로 변경되면 마우스 단추를 놓습니다.

    조회 및 복사 작업 연결을 보여 주는 스크린샷.

  4. 원본 제공 탭에서 다음 구성을 수행합니다.

    • 데이터 저장소 유형: 작업 영역을 선택합니다.

    • 작업 영역 데이터 저장소 유형: Data Warehouse를 선택합니다.

    • Data Warehouse: Data Warehouse를 선택합니다.

    • 쿼리 사용: 쿼리를 선택합니다.

    • 쿼리: 마지막 워터마크와 새 워터마크 사이에 증분 데이터를 복사하려면 다음 쿼리를 입력하세요.

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    데이터 원본 구성을 보여 주는 스크린샷.

  5. 설명 탭 아래에서 다음 구성을 제공합니다.

    • 데이터 저장소 유형: 작업 영역을 선택합니다.
    • 작업 영역 데이터 저장소 유형: Lakehouse를 선택합니다.
    • Lakehouse: Lakehouse를 선택하세요.
    • 루트 폴더: 파일을 선택합니다.
    • 파일 경로: 복사한 데이터를 저장할 폴더를 지정합니다. 폴더를 선택하려면 찾아보기를 선택하세요. 파일 이름의 경우 동적 콘텐츠 추가를 열고 @CONCAT('Incremental-', pipeline().RunId, '.txt')을(를) 열린 창에 입력하여 Lakehouse에서 복사한 데이터 파일의 파일 이름을 만듭니다.
    • 파일 형식: 데이터의 형식 유형을 선택합니다.

    대상 구성 복사를 보여 주는 스크린샷.

5단계:저장 프로시저 작업 추가

이 단계에서는 저장 프로시저 작업을 추가하여 다음 파이프라인 실행에 대한 마지막 워터마크 값을 업데이트합니다.

  1. 상단 표시줄에서 활동을 선택하고 저장 프로시저를 선택하여 저장 프로시저 작업을 추가합니다.

  2. 일반 탭에서 이 작업의 이름을 StoredProceduretoWriteWatermarkActivity로 변경합니다.

  3. 복사 작업의 녹색(성공 위) 출력을 저장 프로시저 작업에 연결합니다.

  4. 설정 탭에서 다음 구성을 수행합니다.

    • 데이터 저장소 유형: 작업 영역을 선택합니다.

    • Data Warehouse: Data Warehouse를 선택합니다.

    • 저장 프로시저 이름: 데이터 웨어하우스: [dbo].[usp_write_watermark].에서 만든 저장 프로시저를 지정합니다.

    • 저장 프로시저 매개 변수를 확장합니다. 저장 프로시저 매개 변수에 대한 값을 지정하려면 가져오기를 클릭하고 매개 변수에 대해 다음 값을 입력합니다.

      속성 타입
      LastModifiedtime DateTime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName 문자열 @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    저장 프로시저 활동 구성을 보여 주는 스크린샷.

6단계:파이프라인 실행 및 결과 모니터링

상단 바의 탭에서 실행을 선택합니다. 그런 다음 저장 및 실행을 선택합니다. 파이프라인 실행이 시작되고 출력 탭에서 파이프라인을 모니터링할 수 있습니다.

파이프라인 실행 결과를 보여 주는 스크린샷.

Lakehouse로 이동하면 지정한 폴더 아래에 있는 데이터 파일을 찾고 파일을 선택하여 복사된 데이터를 미리 볼 수 있습니다.

첫 번째 파이프라인 실행에 대한 Lakehouse 데이터를 보여 주는 스크린샷.

첫 번째 파이프라인 실행에 대한 Lakehouse 데이터 미리 보기를 보여 주는 스크린샷.

더 많은 데이터를 추가하여 증분 복사 결과 확인하기

첫 번째 파이프라인 실행을 완료한 후 Data Warehouse 원본 테이블에 더 많은 데이터를 추가하여 이 파이프라인이 증분 데이터를 복사할 수 있는지 확인해 보겠습니다.

1단계: 원본에 더 많은 데이터 추가

다음 쿼리를 실행하여 Data Warehouse에 새 데이터를 삽입합니다.

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

data_source_table용 업데이트된 데이터는 다음과 같습니다.

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

2단계: 다른 파이프라인 실행 트리거 및 결과 모니터링

파이프라인 페이지로 돌아갑니다. 상단 바의 탭에서 실행을 다시 선택합니다. 파이프라인 실행이 시작되고 출력에서 파이프라인을 모니터링할 수 있습니다.

Lakehouse로 이동하여 지정한 폴더 아래에 있는 복사한 새 데이터 파일을 찾고 파일을 선택하여 복사된 데이터를 미리 볼 수 있습니다. 증분 데이터가 이 파일에 표시됩니다.

두 번째 파이프라인 실행에 대한 Lakehouse 데이터를 보여 주는 스크린샷.

두 번째 파이프라인 실행에 대한 Lakehouse 데이터 미리 보기를 보여 주는 스크린샷.

다음으로 Azure Blob Storage에서 Lakehouse로의 복사에 대해 자세히 알아보세요.