SQL Server의 여러 테이블에서 PowerShell을 사용해 Azure SQL Database로 데이터 증분 로드
적용 대상: Azure Data Factory Azure Synapse Analytics
팁
기업용 올인원 분석 솔루션인 Microsoft Fabric의 Data Factory를 사용해 보세요. Microsoft Fabric은 데이터 이동부터 데이터 과학, 실시간 분석, 비즈니스 인텔리전스 및 보고에 이르기까지 모든 것을 다룹니다. 무료로 새 평가판을 시작하는 방법을 알아봅니다!
이 자습서에서는 델타 데이터를 SQL Server 데이터베이스의 여러 테이블에서 Azure SQL Database로 로드하는 파이프라인이 있는 Azure Data Factory를 만듭니다.
이 자습서에서 수행하는 단계는 다음과 같습니다.
- 원본 및 대상 데이터 저장소를 준비합니다.
- 데이터 팩터리를 만듭니다.
- 자체 호스팅 통합 런타임 만들기
- Integration Runtime을 설치합니다.
- 연결된 서비스만들기.
- 원본, 싱크 및 워터마크 데이터 세트를 만듭니다.
- 파이프라인을 만들고 실행하고 모니터링합니다.
- 결과를 검토합니다.
- 원본 테이블의 데이터를 추가 또는 업데이트합니다.
- 파이프라인을 다시 실행하고 모니터링합니다.
- 최종 결과를 검토합니다.
개요
이 솔루션을 만드는 중요한 단계는 다음과 같습니다.
워터마크 열을 선택합니다.
원본 데이터 저장소에서 각 테이블에 대해 하나의 열을 선택합니다. 이 열은 모든 실행에 대해 새 레코드 또는 업데이트된 레코드를 식별할 수 있습니다. 선택한 이 열의 데이터(예: last_modify_time 또는 ID)는 일반적으로 행을 만들거나 업데이트할 때 계속 증가합니다. 이 열의 최대 값은 워터마크로 사용됩니다.
워터마크 값을 저장할 데이터 저장소를 준비합니다.
이 자습서에서는 SQL 데이터베이스에 워터마크 값을 저장합니다.
다음 작업을 사용하여 파이프라인을 만듭니다.
파이프라인에 매개 변수로 전달되는 원본 테이블 이름 목록을 반복하는 ForEach 작업을 만듭니다. 각 원본 테이블에 해당 테이블에 대한 델타 로드를 수행하는 다음 작업을 호출합니다.
두 가지 조회 작업을 만듭니다. 첫 번째 조회 작업을 사용하여 마지막 워터마크 값을 검색합니다. 두 번째 조회 작업을 사용하여 새 워터마크 값을 검색합니다. 이러한 워터마크 값은 복사 작업에 전달됩니다.
이전 워터마크 값보다 크고, 새 워터마크 값보다 작거나 같은 워터마크 열 값으로 원본 데이터 저장소의 행을 복사하는 복사 작업을 만듭니다. 그런 다음 원본 데이터 스토리지의 델타 데이터를 새 파일로 Azure Blob Storage에 복사합니다.
다음에 실행되는 파이프라인에 대한 워터마크 값을 업데이트하는 StoredProcedure 작업을 만듭니다.
대략적인 솔루션 다이어그램은 다음과 같습니다.
Azure 구독이 아직 없는 경우 시작하기 전에 체험 계정을 만듭니다.
사전 요구 사항
- SQL Server. 이 자습서에서는 SQL Server 데이터베이스를 원본 데이터 저장소로 사용합니다.
- Azure SQL Database. Azure SQL Database의 데이터베이스를 싱크 데이터 저장소로 사용합니다. SQL 데이터베이스가 없는 경우 만드는 단계는 Azure SQL Database에서 데이터베이스 만들기를 참조하세요.
SQL Server 데이터베이스에 원본 테이블 만들기
SSMS(SQL Server Management Studio) 또는 Azure Data Studio를 열고 SQL Server 데이터베이스에 연결합니다.
서버 탐색기(SSMS) 또는 연결 창(Azure Data Studio)에서 데이터베이스를 마우스 오른쪽 단추로 클릭하고 새 쿼리을 선택합니다.
데이터베이스에 대해 다음 SQL 명령을 실행하여
customer_table
및project_table
(이)라는 테이블을 만듭니다.create table customer_table ( PersonID int, Name varchar(255), LastModifytime datetime ); create table project_table ( Project varchar(255), Creationtime datetime ); INSERT INTO customer_table (PersonID, Name, LastModifytime) VALUES (1, 'John','9/1/2017 12:56:00 AM'), (2, 'Mike','9/2/2017 5:23:00 AM'), (3, 'Alice','9/3/2017 2:36:00 AM'), (4, 'Andy','9/4/2017 3:21:00 AM'), (5, 'Anny','9/5/2017 8:06:00 AM'); INSERT INTO project_table (Project, Creationtime) VALUES ('project1','1/1/2015 0:00:00 AM'), ('project2','2/2/2016 1:23:00 AM'), ('project3','3/4/2017 5:16:00 AM');
Azure SQL Database에 대상 테이블 만들기
SSMS(SQL Server Management Studio) 또는 Azure Data Studio를 열고 SQL Server 데이터베이스에 연결합니다.
서버 탐색기(SSMS) 또는 연결 창(Azure Data Studio)에서 데이터베이스를 마우스 오른쪽 단추로 클릭하고 새 쿼리을 선택합니다.
데이터베이스에 대해 다음 SQL 명령을 실행하여
customer_table
및project_table
(이)라는 테이블을 만듭니다.create table customer_table ( PersonID int, Name varchar(255), LastModifytime datetime ); create table project_table ( Project varchar(255), Creationtime datetime );
상위 워터마크 값을 저장하기 위해 다른 테이블을 Azure SQL Database에 만들기
데이터베이스에 대해 다음 SQL 명령을 실행하여 워터마크 값을 저장할
watermarktable
테이블을 만듭니다.create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );
두 원본 테이블의 초기 워터마크 값을 워터마크 테이블에 삽입합니다.
INSERT INTO watermarktable VALUES ('customer_table','1/1/2010 12:00:00 AM'), ('project_table','1/1/2010 12:00:00 AM');
Azure SQL Database에 저장 프로시저 만들기
다음 명령을 실행하여 데이터베이스에 저장 프로시저를 만듭니다. 이 저장 프로시저는 파이프라인의 실행이 끝날 때마다 워터마크 값을 업데이트합니다.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Azure SQL Database에 데이터 형식 및 추가 저장 프로시저 만들기
다음 쿼리를 실행하여 데이터베이스에 두 개의 데이터 형식과 두 개의 저장 프로시저를 만듭니다. 원본 테이블의 데이터를 대상 테이블에 병합하는 데 사용됩니다.
여정을 쉽게 시작할 수 있도록 테이블 변수를 통해 델타 데이터를 전달하는 이러한 저장 프로시저를 직접 사용한 다음, 대상 저장소에 병합합니다. 테이블 변수에 "많은" 수(100개 초과)의 델타 행이 저장될 수 없으니 주의하세요.
많은 수의 델타 행을 대상 저장소에 병합해야 하는 경우, 먼저 복사 작업을 사용하여 모든 델타 데이터를 대상 저장소의 임시 "준비" 테이블에 복사한 다음, 테이블 변수를 사용하지 않고 저장 프로시저를 직접 빌드하여 “준비” 테이블에서 “최종” 테이블로 병합하는 것이 좋습니다.
CREATE TYPE DataTypeforCustomerTable AS TABLE(
PersonID int,
Name varchar(255),
LastModifytime datetime
);
GO
CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS
BEGIN
MERGE customer_table AS target
USING @customer_table AS source
ON (target.PersonID = source.PersonID)
WHEN MATCHED THEN
UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
WHEN NOT MATCHED THEN
INSERT (PersonID, Name, LastModifytime)
VALUES (source.PersonID, source.Name, source.LastModifytime);
END
GO
CREATE TYPE DataTypeforProjectTable AS TABLE(
Project varchar(255),
Creationtime datetime
);
GO
CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS
BEGIN
MERGE project_table AS target
USING @project_table AS source
ON (target.Project = source.Project)
WHEN MATCHED THEN
UPDATE SET Creationtime = source.Creationtime
WHEN NOT MATCHED THEN
INSERT (Project, Creationtime)
VALUES (source.Project, source.Creationtime);
END
Azure PowerShell
Azure PowerShell 설치 및 구성의 지침에 따라 최신 Azure PowerShell 모듈을 설치합니다.
데이터 팩터리 만들기
나중에 PowerShell 명령에서 사용할 리소스 그룹 이름에 대한 변수를 정의합니다. PowerShell에 다음 명령 텍스트를 복사하고, 큰따옴표에 Azure 리소스 그룹의 이름을 지정한 다음 명령을 실행합니다. 예제는
"adfrg"
입니다.$resourceGroupName = "ADFTutorialResourceGroup";
리소스 그룹이 이미 있는 경우 덮어쓰지 않는 것이 좋습니다.
$resourceGroupName
변수에 다른 값을 할당하고 명령을 다시 실행합니다.데이터 팩터리의 위치에 대한 변수를 정의합니다.
$location = "East US"
새 리소스 그룹을 만들려면 다음 명령을 실행합니다.
New-AzResourceGroup $resourceGroupName $location
리소스 그룹이 이미 있는 경우 덮어쓰지 않는 것이 좋습니다.
$resourceGroupName
변수에 다른 값을 할당하고 명령을 다시 실행합니다.데이터 팩터리 이름에 대한 변수를 정의합니다.
Important
데이터 팩터리 이름을 전역적으로 고유하게 업데이트합니다. 예: ADFIncMultiCopyTutorialFactorySP1127.
$dataFactoryName = "ADFIncMultiCopyTutorialFactory";
데이터 팩터리를 만들려면 다음 Set-AzDataFactoryV2 cmdlet을 실행합니다.
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
다음 사항에 유의하세요.
데이터 팩터리 이름은 전역적으로 고유해야 합니다. 다음 오류가 표시되면 이름을 변경하고 다시 시도합니다.
Set-AzDataFactoryV2 : HTTP Status Code: Conflict Error Code: DataFactoryNameInUse Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
Data Factory 인스턴스를 만들려면 Azure에 로그인하는 데 사용할 사용자 계정이 참여자 또는 소유자 역할의 구성원이거나, Azure 구독의 관리자여야 합니다.
Data Factory를 현재 사용할 수 있는 Azure 지역 목록을 보려면 다음 페이지에서 관심 있는 지역을 선택한 다음, Analytics를 펼쳐서 Data Factory: 지역별 사용 가능한 제품을 찾습니다. 데이터 팩터리에서 사용되는 데이터 저장소(Azure Storage, SQL Database, SQL Managed Instance 등) 및 컴퓨팅(Azure HDInsight 등)은 다른 지역에 있을 수 있습니다.
자체 호스팅 Integration Runtime 만들기
이 섹션에서는 자체 호스팅 Integration Runtime을 만들고 이를 Microsoft SQL Server 데이터베이스와 함께 온-프레미스 컴퓨터에 연결합니다. 자체 호스팅 통합 런타임은 머신의 SQL Server에서 Azure SQL Database로 데이터를 복사하는 구성 요소입니다.
통합 런타임의 이름에 대한 변수를 만듭니다. 고유한 이름을 사용하고 기록해 둡니다. 이 자습서의 뒷부분에서 사용합니다.
$integrationRuntimeName = "ADFTutorialIR"
자체 호스팅 통합 런타임 만들기
Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
샘플 출력은 다음과 같습니다.
Name : <Integration Runtime name> Type : SelfHosted ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Description : Id : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
만든 통합 런타임의 상태를 검색하려면 다음 명령을 실행합니다. State 속성의 값이 NeedRegistration으로 설정되어 있는지 확인합니다.
Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
샘플 출력은 다음과 같습니다.
State : NeedRegistration Version : CreateTime : 9/24/2019 6:00:00 AM AutoUpdate : On ScheduledUpdateDate : UpdateDelayOffset : LocalTimeZoneOffset : InternalChannelEncryption : Capabilities : {} ServiceUrls : {eu.frontend.clouddatahub.net} Nodes : {} Links : {} Name : ADFTutorialIR Type : SelfHosted ResourceGroupName : <ResourceGroup name> DataFactoryName : <DataFactory name> Description : Id : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
자체 호스팅 통합 런타임을 클라우드의 Azure Data Factory 서비스에 등록하는 데 사용되는 인증 키를 검색하려면 다음 명령을 실행합니다.
Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
샘플 출력은 다음과 같습니다.
{ "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=", "AuthKey2": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy=" }
다음 단계에서 컴퓨터에 설치할 자체 호스팅된 통합 런타임을 등록하는 데 사용되는 키 중 하나를 복사합니다(큰 따옴표 제외).
통합 런타임 도구 설치
컴퓨터에 통합 런타임이 이미 있는 경우 프로그램 추가/제거를 사용하여 제거합니다.
로컬 Windows 컴퓨터에 자체 호스팅된 통합 런타임을 다운로드합니다. 설치를 실행합니다.
Microsoft Integration Runtime 설치 시작 페이지에서 다음을 선택합니다.
최종 사용자 사용권 계약 페이지에서 사용권 계약에 동의하고 다음을 클릭합니다.
대상 폴더 페이지에서 다음을 선택합니다.
Microsoft Integration Runtime을 설치할 준비가 됨 페이지에서 설치를 선택합니다.
Microsoft Integration Runtime 설치 완료 페이지에서 마침을 선택합니다.
통합 런타임(자체 호스팅) 등록 페이지에 이전 섹션에서 저장한 키를 붙여넣고 등록을 선택합니다.
새 통합 런타임(자체 호스팅) 노드 페이지에서 마침을 선택합니다.
자체 호스팅 통합 런타임이 성공적으로 등록되면 다음 메시지가 표시됩니다.
통합 런타임(자체 호스팅) 등록 페이지에서 구성 관리자 시작을 선택합니다.
노드가 클라우드 서비스에 연결되면 다음 페이지가 표시됩니다.
이제, Microsoft SQL Server 데이터베이스에 대한 연결을 테스트합니다.
a. 구성 관리자 페이지에서 진단 탭으로 이동합니다.
b. 데이터 원본 유형에 대한 SqlServer를 선택합니다.
c. 서버 이름을 입력합니다.
d. 데이터베이스 이름을 입력합니다.
e. 인증 모드를 선택합니다.
f. 사용자 이름을 입력합니다.
g. 사용자 이름과 연결된 암호를 입력합니다.
h. 통합 런타임을 SQL Server에 연결할 수 있는지 확인하려면 테스트를 선택합니다. 연결이 성공하면 녹색 확인 표시가 나타납니다. 연결이 실패하면 오류 메시지가 나타납니다. 모든 문제를 해결하고 통합 런타임을 SQL Server에 연결할 수 있는지 확인합니다.
참고 항목
인증 유형, 서버, 데이터베이스, 사용자 및 암호의 값을 기록해둡니다. 이 자습서의 뒷부분에서 사용됩니다.
연결된 서비스 생성
데이터 팩터리에서 연결된 서비스를 만들어 데이터 저장소를 연결하고 컴퓨팅 서비스를 데이터 팩터리에 연결합니다. 이 섹션에서는 SQL Server 데이터베이스 및 Azure SQL Database의 데이터베이스에 연결된 서비스를 만듭니다.
SQL Server에 연결된 서비스 만들기
이 단계에서는 SQL Server 데이터베이스를 데이터 팩터리에 연결합니다.
C:\ADFTutorials\IncCopyMultiTableTutorial 폴더에 다음 내용이 포함된 SqlServerLinkedService.json이라는 JSON 파일을 만듭니다(아직 없는 경우 로컬 폴더 생성). Microsoft SQL Server에 연결하는 데 사용하는 인증에 기초하여 올바른 선택을 합니다.
Important
Microsoft SQL Server에 연결하는 데 사용하는 인증에 기초하여 올바른 선택을 합니다.
SQL 인증을 사용하는 경우 다음 JSON 정의를 복사합니다.
{ "name":"SqlServerLinkedService", "properties":{ "annotations":[ ], "type":"SqlServer", "typeProperties":{ "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>" }, "connectVia":{ "referenceName":"<integration runtime name>", "type":"IntegrationRuntimeReference" } } }
Windows 인증을 사용하는 경우 다음 JSON 정의를 복사합니다.
{ "name":"SqlServerLinkedService", "properties":{ "annotations":[ ], "type":"SqlServer", "typeProperties":{ "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>", "userName":"<username> or <domain>\\<username>", "password":{ "type":"SecureString", "value":"<password>" } }, "connectVia":{ "referenceName":"<integration runtime name>", "type":"IntegrationRuntimeReference" } } }
Important
- Microsoft SQL Server에 연결하는 데 사용하는 인증에 기초하여 올바른 선택을 합니다.
- <integration runtime name>을 사용자의 통합 런타임 이름으로 바꿉니다.
- 파일을 저장하기 전에 <servername>, <databasename>, <username> 및 <password>를 SQL Server 데이터베이스의 사용자 값으로 바꿉니다.
- 슬래시 문자(
\
)를 사용자 계정 또는 서버 이름에 사용해야 할 경우 이스케이프 문자(\
)를 사용합니다. 예제는mydomain\\myuser
입니다.
PowerShell에서 다음 cmdlet을 실행하여 C:\ADFTutorials\IncCopyMultiTableTutorial 폴더로 전환합니다.
Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
Set-AzDataFactoryV2LinkedService cmdlet을 실행하여 AzureStorageLinkedService라는 연결된 서비스를 만듭니다. 다음 예제에서는 ResourceGroupName 및 DataFactoryName 매개 변수에 대한 값을 전달합니다.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
샘플 출력은 다음과 같습니다.
LinkedServiceName : SqlServerLinkedService ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
SQL Database에 연결된 서비스 만들기
C:\ADFTutorials\IncCopyMultiTableTutorial 폴더에 다음 내용이 포함된 AzureSQLDatabaseLinkedService.json이라는 JSON 파일을 만듭니다. (ADF 폴더가 없으면 해당 폴더를 만듭니다.) 파일을 저장하기 전에 <servername>, <database name>, <user name> 및 <password>를 SQL Server 데이터베이스 이름, 데이터베이스 이름, 사용자 이름 및 암호로 바꿉니다.
{ "name":"AzureSQLDatabaseLinkedService", "properties":{ "annotations":[ ], "type":"AzureSqlDatabase", "typeProperties":{ "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;" } } }
PowerShell에서 Set-AzDataFactoryV2LinkedService cmdlet을 실행하여 AzureSQLDatabaseLinkedService라는 연결된 서비스를 만듭니다.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
샘플 출력은 다음과 같습니다.
LinkedServiceName : AzureSQLDatabaseLinkedService ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
데이터 세트 만들기
이 단계에서는 데이터 원본, 데이터 대상 및 워터마크를 저장할 위치를 나타내는 데이터 세트를 만듭니다.
원본 데이터 세트 만들기
동일한 폴더에 다음 내용이 포함된 SourceDataset.json이라는 JSON 파일을 만듭니다.
{ "name":"SourceDataset", "properties":{ "linkedServiceName":{ "referenceName":"SqlServerLinkedService", "type":"LinkedServiceReference" }, "annotations":[ ], "type":"SqlServerTable", "schema":[ ] } }
파이프라인의 복사 작업은 전체 테이블을 로드하는 대신 SQL 쿼리를 사용하여 데이터를 로드합니다.
Set-AzDataFactoryV2Dataset cmdlet을 실행하여 SourceDataset 데이터 세트를 만듭니다.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
cmdlet의 샘플 출력은 다음과 같습니다.
DatasetName : SourceDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
싱크 데이터 세트 만들기
동일한 폴더에 다음 내용이 포함된 SinkDataset.json이라는 JSON 파일을 만듭니다. tableName 요소는 런타임에 동적으로 파이프라인에 의해 설정됩니다. 파이프라인의 ForEach 작업은 테이블 이름 목록을 반복하고 반복할 때마다 테이블 이름을 이 데이터 세트에 전달합니다.
{ "name":"SinkDataset", "properties":{ "linkedServiceName":{ "referenceName":"AzureSQLDatabaseLinkedService", "type":"LinkedServiceReference" }, "parameters":{ "SinkTableName":{ "type":"String" } }, "annotations":[ ], "type":"AzureSqlTable", "typeProperties":{ "tableName":{ "value":"@dataset().SinkTableName", "type":"Expression" } } } }
Set-AzDataFactoryV2Dataset cmdlet을 실행하여 SinkDataset 데이터 세트를 만듭니다.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
cmdlet의 샘플 출력은 다음과 같습니다.
DatasetName : SinkDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
워터마크에 대한 데이터 세트 만들기
이 단계에서는 상위 워터마크 값을 저장하기 위한 데이터 세트를 만듭니다.
동일한 폴더에 다음 내용이 포함된 WatermarkDataset.json이라는 JSON 파일을 만듭니다.
{ "name": " WatermarkDataset ", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "watermarktable" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
Set-AzDataFactoryV2Dataset cmdlet을 실행하여 WatermarkDataset 데이터 세트를 만듭니다.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
cmdlet의 샘플 출력은 다음과 같습니다.
DatasetName : WatermarkDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
파이프라인을 만듭니다.
파이프라인에서는 테이블 이름 목록을 매개 변수로 사용합니다. ForEach 작업은 테이블 이름 목록을 반복하고 다음 작업을 수행합니다.
Lookup 작업을 사용하여 이전 워터마크 값(초기 값 또는 마지막 반복에서 사용된 값)을 검색합니다.
Lookup 작업을 사용하여 새로운 워터마크 값(원본 테이블의 워터마크 열의 최댓값)을 검색합니다.
Copy 작업을 사용하여 원본 데이터베이스에서 대상 데이터베이스로 2개의 워터마크 값 사이에 데이터를 복사합니다.
StoredProcedure 작업을 사용하여 다음 반복의 첫 번째 단계에 사용할 이전 워터마크 값을 업데이트합니다.
파이프라인 만들기
동일한 폴더에 다음 내용이 포함된 IncrementalCopyPipeline.json이라는 JSON 파일을 만듭니다.
{ "name":"IncrementalCopyPipeline", "properties":{ "activities":[ { "name":"IterateSQLTables", "type":"ForEach", "dependsOn":[ ], "userProperties":[ ], "typeProperties":{ "items":{ "value":"@pipeline().parameters.tableList", "type":"Expression" }, "isSequential":false, "activities":[ { "name":"LookupOldWaterMarkActivity", "type":"Lookup", "dependsOn":[ ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"AzureSqlSource", "sqlReaderQuery":{ "value":"select * from watermarktable where TableName = '@{item().TABLE_NAME}'", "type":"Expression" } }, "dataset":{ "referenceName":"WatermarkDataset", "type":"DatasetReference" } } }, { "name":"LookupNewWaterMarkActivity", "type":"Lookup", "dependsOn":[ ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"SqlServerSource", "sqlReaderQuery":{ "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}", "type":"Expression" } }, "dataset":{ "referenceName":"SourceDataset", "type":"DatasetReference" }, "firstRowOnly":true } }, { "name":"IncrementalCopyActivity", "type":"Copy", "dependsOn":[ { "activity":"LookupOldWaterMarkActivity", "dependencyConditions":[ "Succeeded" ] }, { "activity":"LookupNewWaterMarkActivity", "dependencyConditions":[ "Succeeded" ] } ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"SqlServerSource", "sqlReaderQuery":{ "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'", "type":"Expression" } }, "sink":{ "type":"AzureSqlSink", "sqlWriterStoredProcedureName":{ "value":"@{item().StoredProcedureNameForMergeOperation}", "type":"Expression" }, "sqlWriterTableType":{ "value":"@{item().TableType}", "type":"Expression" }, "storedProcedureTableTypeParameterName":{ "value":"@{item().TABLE_NAME}", "type":"Expression" }, "disableMetricsCollection":false }, "enableStaging":false }, "inputs":[ { "referenceName":"SourceDataset", "type":"DatasetReference" } ], "outputs":[ { "referenceName":"SinkDataset", "type":"DatasetReference", "parameters":{ "SinkTableName":{ "value":"@{item().TABLE_NAME}", "type":"Expression" } } } ] }, { "name":"StoredProceduretoWriteWatermarkActivity", "type":"SqlServerStoredProcedure", "dependsOn":[ { "activity":"IncrementalCopyActivity", "dependencyConditions":[ "Succeeded" ] } ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "storedProcedureName":"[dbo].[usp_write_watermark]", "storedProcedureParameters":{ "LastModifiedtime":{ "value":{ "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type":"Expression" }, "type":"DateTime" }, "TableName":{ "value":{ "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"Expression" }, "type":"String" } } }, "linkedServiceName":{ "referenceName":"AzureSQLDatabaseLinkedService", "type":"LinkedServiceReference" } } ] } } ], "parameters":{ "tableList":{ "type":"array" } }, "annotations":[ ] } }
Set-AzDataFactoryV2Pipeline cmdlet을 실행하여 IncrementalCopyPipeline 파이프라인을 만듭니다.
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
샘플 출력은 다음과 같습니다.
PipelineName : IncrementalCopyPipeline ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Activities : {IterateSQLTables} Parameters : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
파이프라인 실행
동일한 폴더에 다음 내용이 포함된 Parameters.json이라는 매개 변수 파일을 만듭니다.
{ "tableList": [ { "TABLE_NAME": "customer_table", "WaterMark_Column": "LastModifytime", "TableType": "DataTypeforCustomerTable", "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table" }, { "TABLE_NAME": "project_table", "WaterMark_Column": "Creationtime", "TableType": "DataTypeforProjectTable", "StoredProcedureNameForMergeOperation": "usp_upsert_project_table" } ] }
Invoke-AzDataFactoryV2Pipeline cmdlet을 사용하여 IncrementalCopyPipeline 파이프라인을 실행합니다. 자리 표시자를 사용자의 리소스 그룹 및 데이터 팩터리 이름으로 바꿉니다.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
파이프라인 모니터링
Azure Portal에 로그인합니다.
모든 서비스를 선택하고 데이터 팩터리 키워드를 사용하여 검색하고 데이터 팩터리를 선택합니다.
데이터 팩터리 목록에서 데이터 팩터리를 검색하고 선택하여 데이터 팩터리 페이지를 시작합니다.
데이터 팩터리 페이지의 Azure Data Factory Studio 열기 타일에서 열기를 선택하여 별도 탭에서 Azure Data Factory를 시작합니다.
Azure Data Factory 홈페이지의 왼쪽에서 모니터링을 선택합니다.
모든 파이프라인 실행과 해당 상태를 볼 수 있습니다. 다음 예제에서 파이프라인 실행의 상태는 성공입니다. 파이프라인에 전달된 매개 변수를 확인하려면 매개 변수 열의 링크를 선택합니다. 오류가 발생하면 오류 열에 링크가 표시됩니다.
작업 열에 있는 링크를 선택하면 파이프라인에 대한 모든 활동 실행이 표시됩니다.
파이프라인 실행 보기로 돌아가려면 모든 파이프라인 실행을 선택합니다.
결과 검토
SQL Server Management Studio에서 대상 SQL 데이터베이스에 대해 다음 쿼리를 실행하여 데이터가 원본 테이블에서 대상 테이블로 복사되었는지 확인합니다.
쿼리
select * from customer_table
출력
===========================================
PersonID Name LastModifytime
===========================================
1 John 2017-09-01 00:56:00.000
2 Mike 2017-09-02 05:23:00.000
3 Alice 2017-09-03 02:36:00.000
4 Andy 2017-09-04 03:21:00.000
5 Anny 2017-09-05 08:06:00.000
쿼리
select * from project_table
출력
===================================
Project Creationtime
===================================
project1 2015-01-01 00:00:00.000
project2 2016-02-02 01:23:00.000
project3 2017-03-04 05:16:00.000
쿼리
select * from watermarktable
출력
======================================
TableName WatermarkValue
======================================
customer_table 2017-09-05 08:06:00.000
project_table 2017-03-04 05:16:00.000
두 테이블의 워터마크 값이 업데이트되었습니다.
원본 테이블에 데이터 추가
원본 SQL Server 데이터베이스에 대해 다음 쿼리를 실행하여 customer_table의 기존 행을 업데이트합니다. project_table에 새 행을 삽입합니다.
UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3
INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');
파이프라인 다시 실행
이제 다음 PowerShell 명령을 실행하여 파이프라인을 다시 실행합니다.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
파이프라인 모니터링 섹션의 지침에 따라 파이프라인 실행을 모니터링합니다. 파이프라인 상태가 진행 중이면 작업 아래 파이프라인 실행을 취소하는 다른 작업 링크가 표시됩니다.
새로 고침을 선택하여 파이프라인 실행이 성공할 때까지 목록을 새로 고칩니다.
필요에 따라, 작업 아래 View Activity Runs(작업 실행 보기) 링크를 클릭하여 이 파이프라인 실행과 연결된 모든 작업 실행을 표시합니다.
최종 결과 검토
SQL Server Management Studio에서 대상 데이터베이스에 대해 다음 쿼리를 실행하여 업데이트된/새로운 데이터가 원본 테이블에서 대상 테이블로 복사되었는지 확인합니다.
쿼리
select * from customer_table
출력
===========================================
PersonID Name LastModifytime
===========================================
1 John 2017-09-01 00:56:00.000
2 Mike 2017-09-02 05:23:00.000
3 NewName 2017-09-08 00:00:00.000
4 Andy 2017-09-04 03:21:00.000
5 Anny 2017-09-05 08:06:00.000
3번 PersonID에 대한 Name 및 LastModifytime의 새 값을 확인합니다.
쿼리
select * from project_table
출력
===================================
Project Creationtime
===================================
project1 2015-01-01 00:00:00.000
project2 2016-02-02 01:23:00.000
project3 2017-03-04 05:16:00.000
NewProject 2017-10-01 00:00:00.000
NewProject 항목이 project_table에 추가되었습니다.
쿼리
select * from watermarktable
출력
======================================
TableName WatermarkValue
======================================
customer_table 2017-09-08 00:00:00.000
project_table 2017-10-01 00:00:00.000
두 테이블의 워터마크 값이 업데이트되었습니다.
관련 콘텐츠
이 자습서에서 다음 단계를 수행했습니다.
- 원본 및 대상 데이터 저장소를 준비합니다.
- 데이터 팩터리를 만듭니다.
- 자체 호스팅 IR(통합 런타임)을 만듭니다.
- Integration Runtime을 설치합니다.
- 연결된 서비스만들기.
- 원본, 싱크 및 워터마크 데이터 세트를 만듭니다.
- 파이프라인을 만들고 실행하고 모니터링합니다.
- 결과를 검토합니다.
- 원본 테이블의 데이터를 추가 또는 업데이트합니다.
- 파이프라인을 다시 실행하고 모니터링합니다.
- 최종 결과를 검토합니다.
Azure에서 Spark 클러스터를 사용하여 데이터를 변환하는 방법을 알아보려면 다음 자습서로 진행하세요.