共用方式為


使用 PowerShell 以累加方式將 SQL Server 中多個資料表的資料載入到 Azure SQL Database

適用於:Azure Data Factory Azure Synapse Analytics

提示

試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用

在本教學課程中,您將建立 Azure Data Factory 與管線,以將差異資料從 SQL Server 資料庫中的多個資料表載入 Azure SQL Database。

您會在本教學課程中執行下列步驟:

  • 準備來源和目的地資料存放區。
  • 建立資料處理站。
  • 建立自我裝載整合執行階段。
  • 安裝整合執行階段。
  • 建立連結的服務。
  • 建立來源、接收及水位線資料集。
  • 建立、執行及監視管線。
  • 檢閱結果。
  • 新增或更新來源資料表中的資料。
  • 重新執行,並監視管線。
  • 檢閱最終結果。

概觀

以下是建立此解決方案的重要步驟:

  1. 選取水位線資料行

    為來源資料存放區中的每個資料表各選取一個資料行,以便您在每次執行時識別新增或更新的記錄。 一般來說,當建立或更新資料列時,這個選取的資料行 (例如,last_modify_time 或 ID) 中的資料會持續增加。 此資料行中的最大值就作為水位線。

  2. 準備資料存放區來儲存水位線值

    在本教學課程中,您會將水位線值儲存在 SQL 資料庫中。

  3. 使用下列活動建立管線

    1. 建立 ForEach 活動,逐一查看以參數形式傳遞到管線的來源資料表名稱清單。 此活動會針對每個來源資料表叫用下列活動,以對該資料表執行差異載入。

    2. 建立兩個查閱活動。 使用第一個查閱活動來取出最後一個水位線值。 使用第二個查閱活動來取出新的水位線值。 這些水位線值會傳遞給複製活動。

    3. 建立複製活動,以複製來源資料存放區的資料列,而這些資料列的水位線資料行值大於舊水位線值,且小於或等於新水位線值。 然後,它會將來源資料存放區的差異資料複製到 Azure Blob 儲存體作為新檔案。

    4. 建立 StoredProcedure 活動,以更新下次執行的管線水位線值。

    高階解決方案圖表如下:

    以累加方式載入資料

如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶

必要條件

  • SQL Server。 在本教學課程中,您將使用 SQL Server 資料庫作為來源資料存放區。
  • Azure SQL Database。 您會使用 Azure SQL Database 中的資料庫作為接收資料存放區。 如果您沒有 SQL 資料庫,請參閱在 Azure SQL Database 中建立資料庫,按照步驟來建立 SQL 資料庫。

在 SQL Server 資料庫中建立來源資料表

  1. 開啟 SQL Server Management Studio (SSMS)Azure Data Studio,然後連線至 SQL Server 資料庫。

  2. 伺服器總管 (SSMS)[連線] 窗格 (Azure Data Studio) 中,以滑鼠右鍵按一下資料庫,然後選擇 [新增查詢]

  3. 對您的資料庫執行下列 SQL 命令,以建立名為 customer_tableproject_table 的資料表:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    
     INSERT INTO customer_table
     (PersonID, Name, LastModifytime)
     VALUES
     (1, 'John','9/1/2017 12:56:00 AM'),
     (2, 'Mike','9/2/2017 5:23:00 AM'),
     (3, 'Alice','9/3/2017 2:36:00 AM'),
     (4, 'Andy','9/4/2017 3:21:00 AM'),
     (5, 'Anny','9/5/2017 8:06:00 AM');
    
     INSERT INTO project_table
     (Project, Creationtime)
     VALUES
     ('project1','1/1/2015 0:00:00 AM'),
     ('project2','2/2/2016 1:23:00 AM'),
     ('project3','3/4/2017 5:16:00 AM');
    

在 Azure SQL Database 中建立目的地資料表

  1. 開啟 SQL Server Management Studio (SSMS)Azure Data Studio,然後連線至 SQL Server 資料庫。

  2. 伺服器總管 (SSMS)[連線] 窗格 (Azure Data Studio) 中,以滑鼠右鍵按一下資料庫,然後選擇 [新增查詢]

  3. 對您的資料庫執行下列 SQL 命令,以建立名為 customer_tableproject_table 的資料表:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    

在 Azure SQL Database 中建立另一個資料表來儲存高水位線值

  1. 對資料庫執行下列 SQL 命令,以建立名為 watermarktable 的資料表來儲存水位線值:

     create table watermarktable
     (
    
         TableName varchar(255),
         WatermarkValue datetime,
     );
    
  2. 在水位線資料表中插入這兩個來源資料表的初始水位線值。

     INSERT INTO watermarktable
     VALUES
     ('customer_table','1/1/2010 12:00:00 AM'),
     ('project_table','1/1/2010 12:00:00 AM');
    

在 Azure SQL Database 中建立預存程序

執行下列命令,在您的資料庫中建立預存程序。 這個預存程序會在每次管線執行之後更新水位線值。

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

在 Azure SQL Database 中建立資料類型和其他預存程序

執行下列查詢,在您的資料庫中建立兩個預存程序和兩個資料類型。 它們用來將來源資料表的資料合併到目的地資料表。

為了能輕鬆地開始這趟教學旅程,我們會直接使用這些預存程序,以透過資料表變數來傳入差異資料,然後再將這些資料合併到目的地存放區。 請注意,資料表變數中不適合存放「大量」的差異資料列 (超過 100 列)。

如果您需要將大量的差異資料列合併到目的地存放區,建議您使用複製活動,先將所有差異資料複製到目的地存放區中暫時的「暫存」資料表,然後建置您自己的預存程序 (不使用資料表變數) 來將資料從「暫存」資料表合併到「最終」資料表。

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Azure PowerShell

依照安裝和設定 Azure PowerShell 中的指示,安裝最新的 Azure PowerShell 模組。

建立資料處理站

  1. 定義資源群組名稱的變數,以便稍後在 PowerShell 命令中使用。 將下列命令文字複製到 PowerShell,以雙引號指定 Azure 資源群組的名稱,然後執行命令。 例如 "adfrg"

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    不建議您覆寫已經存在的資源群組。 將不同的值指派給 $resourceGroupName 變數,然後再執行一次命令。

  2. 定義資料處理站位置的變數。

    $location = "East US"
    
  3. 若要建立 Azure 資源群組,請執行下列命令:

    New-AzResourceGroup $resourceGroupName $location
    

    不建議您覆寫已經存在的資源群組。 將不同的值指派給 $resourceGroupName 變數,然後再執行一次命令。

  4. 定義 Data Factory 名稱的變數。

    重要

    更新資料處理站名稱,使其成為全域唯一的資料處理站。 例如,ADFIncMultiCopyTutorialFactorySP1127。

    $dataFactoryName = "ADFIncMultiCopyTutorialFactory";
    
  5. 若要建立資料處理站,請執行下列 Set-AzDataFactoryV2 Cmdlet:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

請注意下列幾點:

  • 資料處理站的名稱必須是全域唯一的名稱。 如果發生下列錯誤,請變更名稱,並再試一次:

    Set-AzDataFactoryV2 : HTTP Status Code: Conflict
    Error Code: DataFactoryNameInUse
    Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
    
  • 若要建立 Data Factory 執行個體,您用來登入 Azure 的使用者帳戶必須為參與者或擁有者角色,或是 Azure 訂用帳戶的管理員。

  • 如需目前可使用 Data Factory 的 Azure 區域清單,請在下列頁面上選取您感興趣的區域,然後展開 [分析] 以找出 [Data Factory]依區域提供的產品。 資料處理站所使用的資料存放區 (Azure 儲存體、SQL Database、SQL 受控執行個體等) 和計算 (Azure HDInsight 等) 可位於其他區域。

建立自我裝載整合執行階段

在本節中,您可以建立自我裝載整合執行階段,並使用 SQL Server 資料庫將它與內部部署電腦產生關聯。 自我裝載的整合執行階段是一項元件,可將資料從您電腦上的 SQL Server 複製到 Azure SQL Database。

  1. 建立整合執行階段名稱的變數。 使用唯一的名稱,並記下此名稱。 您將在本教學課程稍後使用它。

    $integrationRuntimeName = "ADFTutorialIR"
    
  2. 建立自我裝載整合執行階段。

    Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
    

    以下是範例輸出:

     Name              : <Integration Runtime name>
     Type              : SelfHosted
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Description       : 
     Id                : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
    
  3. 若要擷取所建立整合執行階段的狀態,請執行下列命令。 確認 State 屬性的值已設定為 NeedRegistration

    Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
    

    以下是範例輸出:

    State                     : NeedRegistration
    Version                   : 
    CreateTime                : 9/24/2019 6:00:00 AM
    AutoUpdate                : On
    ScheduledUpdateDate       : 
    UpdateDelayOffset         : 
    LocalTimeZoneOffset       : 
    InternalChannelEncryption : 
    Capabilities              : {}
    ServiceUrls               : {eu.frontend.clouddatahub.net}
    Nodes                     : {}
    Links                     : {}
    Name                      : ADFTutorialIR
    Type                      : SelfHosted
    ResourceGroupName         : <ResourceGroup name>
    DataFactoryName           : <DataFactory name>
    Description               : 
    Id                        : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
    
  4. 若要擷取驗證金鑰,用來向雲端中的 Azure Data Factory 服務註冊自我裝載的整合執行階段,請執行下列命令:

    Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
    

    以下是範例輸出:

    {
     "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=",
     "AuthKey2":  "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy="
    }
    
  5. 複製其中一個金鑰 (不含雙引號),該金鑰用來註冊您在下列步驟中安裝於電腦上的自我裝載整合執行階段。

安裝整合執行階段工具

  1. 如果您的機器上已經有整合執行階段,請使用 [新增或移除程式] 將其解除安裝。

  2. 在本機 Windows 電腦上下載自我裝載的整合執行階段。 執行安裝。

  3. 在 [歡迎使用 Microsoft Integration Runtime 設定] 頁面上,選取 [下一步]

  4. 在 [使用者授權合約] 頁面上,接受條款和授權合約,然後選取 [下一步]

  5. 在 [目的地資料夾] 頁面上,選取 [下一步]

  6. 在 [準備好要安裝 Microsoft Integration Runtime] 頁面上,選取 [安裝]

  7. 在 [完成 Microsoft Integration Runtime 設定] 頁面上,選取 [完成]

  8. 在 [註冊 Integration Runtime (自我裝載)] 頁面上,貼上您在上一節中儲存的金鑰,然後選取 [註冊]

    註冊整合執行階段

  9. 在 [新增 Integration Runtime (自我裝載) 節點] 頁面上,選取 [完成]

  10. 當自我裝載的整合執行階段註冊成功時,您會看到下列訊息:

    已成功註冊

  11. 在 [註冊整合執行階段 (自我裝載)] 頁面上,選取 [啟動組態管理員]

  12. 當節點已連線至雲端服務時,您會看到下列頁面:

    節點已連線頁面

  13. 現在,測試您 SQL Server 資料庫的連線。

    診斷索引標籤

    a. 在 [組態管理員] 頁面上,移至 [診斷] 索引標籤。

    b. 選取 [SqlServer] 作為資料來源類型。

    c. 輸入伺服器名稱。

    d. 輸入資料庫名稱。

    e. 選取驗證模式。

    f. 輸入使用者名稱。

    .g 輸入與使用者名稱相關聯的密碼。

    h. 請選取 [測試],以確認整合執行階段可以連線到 SQL Server。 如果連線成功,您會看到綠色的核取記號。 如果連線不成功,您會看到一則錯誤訊息。 修正所有問題,並確定整合執行階段可連線到 SQL Server。

    注意

    請記下驗證類型、伺服器、資料庫、使用者和密碼的值。 您稍後會在本教學課程中用到。

建立連結服務

您在資料處理站中建立的連結服務會將您的資料存放區和計算服務連結到資料處理站。 在本節中,您將對 SQL Server 資料庫與 Azure SQL Database 中的資料庫建立連結服務。

建立 SQL Server 連結服務

在此步驟中,您要將 SQL Server 資料庫連結至資料處理站。

  1. 使用下列內容,在 C:\ADFTutorials\IncCopyMultiTableTutorial 資料夾中建立名為 SqlServerLinkedService.json 的 JSON 檔案 (如果該資料夾尚不存在,請建立本機資料夾)。 以您用於連線到 SQL Server 的驗證作為基礎,選取右側區段。

    重要

    以您用於連線到 SQL Server 的驗證作為基礎,選取右側區段。

    如果您使用 SQL 驗證,請複製下列 JSON 定義:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>"
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    如果您使用 Windows 驗證,請複製下列 JSON 定義:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>",
                 "userName":"<username> or <domain>\\<username>",
                 "password":{
                     "type":"SecureString",
                     "value":"<password>"
                 }
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    重要

    • 以您用於連線到 SQL Server 的驗證作為基礎,選取右側區段。
    • 將 <整合執行階段名稱> 取代為您的整合執行階段名稱。
    • 儲存檔案之前,請將 <servername>、<databasename>、<username> 和 <password> 取代為您的 SQL Server 資料庫值。
    • 如果您需要在使用者帳戶或伺服器名稱中使用斜線字元 (\),請使用逸出字元 (\)。 例如 mydomain\\myuser
  2. 在 PowerShell 中執行下列 Cmdlet 來切換至 C:\ADFTutorials\IncCopyMultiTableTutorial 資料夾。

    Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
    
  3. 執行 Set-AzDataFactoryV2LinkedService Cmdlet 來建立連結服務 AzureStorageLinkedService。 在下列範例中,您會傳遞 ResourceGroupName 和 DataFactoryName 參數的值:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
    

    以下是範例輸出:

    LinkedServiceName : SqlServerLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
    

建立 SQL Database 連結服務

  1. 使用下列內容,在 C:\ADFTutorials\IncCopyMultiTableTutorial 資料夾中建立名為 AzureSQLDatabaseLinkedService.json 的 JSON 檔案。 (建立資料夾 ADF (如果尚未存在)。)儲存檔案之前,以您的 SQL Server 資料庫名稱、資料庫名稱、使用者名稱和密碼,取代 <servername>、<database name>、<user name> 和 <password>。

     {
         "name":"AzureSQLDatabaseLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"AzureSqlDatabase",
             "typeProperties":{
                 "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;"
             }
         }
     }
    
  2. 在 PowerShell 中,執行 Set-AzDataFactoryV2LinkedService Cmdlet 來建立連結服務 AzureSQLDatabaseLinkedService。

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    以下是範例輸出:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

建立資料集

在此步驟中,您會建立資料集以代表資料來源和資料目的地,以及要儲存水位線的位置。

建立來源資料集

  1. 在相同的資料夾中,使用下列內容建立名為 SourceDataset.json 的 JSON 檔案:

    {
         "name":"SourceDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"SqlServerLinkedService",
                 "type":"LinkedServiceReference"
             },
             "annotations":[
    
             ],
             "type":"SqlServerTable",
             "schema":[
    
             ]
         }
    }
    

    管線中的複製活動會使用 SQL 查詢來載入資料,而不會載入整個資料表。

  2. 執行 Set-AzDataFactoryV2Dataset Cmdlet 來建立資料集 SourceDataset。

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    以下是此 Cmdlet 的範例輸出:

    DatasetName       : SourceDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
    

建立接收資料集

  1. 在相同的資料夾中,使用下列內容建立名為 SinkDataset.json 的 JSON 檔案。 tableName 元素是由管線在執行階段動態設定。 管線中的 ForEach 活動會逐一查看資料表名稱清單,並將資料表名稱傳遞至每個反覆項目中的這個資料集。

     {
         "name":"SinkDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"AzureSQLDatabaseLinkedService",
                 "type":"LinkedServiceReference"
             },
             "parameters":{
                 "SinkTableName":{
                     "type":"String"
                 }
             },
             "annotations":[
    
             ],
             "type":"AzureSqlTable",
             "typeProperties":{
                 "tableName":{
                     "value":"@dataset().SinkTableName",
                     "type":"Expression"
                 }
             }
         }
     }
    
  2. 執行 Set-AzDataFactoryV2Dataset Cmdlet 來建立資料集 SinkDataset。

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    以下是此 Cmdlet 的範例輸出:

    DatasetName       : SinkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

建立水位線的資料集

在此步驟中,您會建立資料集來儲存高水位線值。

  1. 在相同的資料夾中,使用下列內容建立名為 WatermarkDataset.json 的 JSON 檔案:

     {
         "name": " WatermarkDataset ",
         "properties": {
             "type": "AzureSqlTable",
             "typeProperties": {
                 "tableName": "watermarktable"
             },
             "linkedServiceName": {
                 "referenceName": "AzureSQLDatabaseLinkedService",
                 "type": "LinkedServiceReference"
             }
         }
     }
    
  2. 執行 Set-AzDataFactoryV2Dataset Cmdlet 來建立資料集 WatermarkDataset。

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    以下是此 Cmdlet 的範例輸出:

    DatasetName       : WatermarkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

建立新管線

管線會使用資料表名稱清單作為參數。 ForEach 活動會逐一查看資料表名稱清單,並執行下列作業:

  1. 使用查閱活動擷取舊的水位線值 (初始值,或最後一次反覆運算中使用的值)。

  2. 使用查閱活動擷取新的水位線值 (來源資料表中水位線資料行的最大值)。

  3. 使用複製活動,在來源資料庫到目的地資料庫的這兩個水位線值之間複製資料。

  4. 使用 StoredProcedure 活動,更新要在下一個反覆項目的第一個步驟中使用的舊水位線值。

建立管線

  1. 在相同的資料夾中,使用下列內容建立名為 IncrementalCopyPipeline.json 的 JSON 檔案:

     {
         "name":"IncrementalCopyPipeline",
         "properties":{
             "activities":[
                 {
                     "name":"IterateSQLTables",
                     "type":"ForEach",
                     "dependsOn":[
    
                     ],
                     "userProperties":[
    
                     ],
                     "typeProperties":{
                         "items":{
                             "value":"@pipeline().parameters.tableList",
                             "type":"Expression"
                         },
                         "isSequential":false,
                         "activities":[
                             {
                                 "name":"LookupOldWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"AzureSqlSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"WatermarkDataset",
                                         "type":"DatasetReference"
                                     }
                                 }
                             },
                             {
                                 "name":"LookupNewWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     },
                                     "firstRowOnly":true
                                 }
                             },
                             {
                                 "name":"IncrementalCopyActivity",
                                 "type":"Copy",
                                 "dependsOn":[
                                     {
                                         "activity":"LookupOldWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     },
                                     {
                                         "activity":"LookupNewWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "sink":{
                                         "type":"AzureSqlSink",
                                         "sqlWriterStoredProcedureName":{
                                             "value":"@{item().StoredProcedureNameForMergeOperation}",
                                             "type":"Expression"
                                         },
                                         "sqlWriterTableType":{
                                             "value":"@{item().TableType}",
                                             "type":"Expression"
                                         },
                                         "storedProcedureTableTypeParameterName":{
                                             "value":"@{item().TABLE_NAME}",
                                             "type":"Expression"
                                         },
                                         "disableMetricsCollection":false
                                     },
                                     "enableStaging":false
                                 },
                                 "inputs":[
                                     {
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     }
                                 ],
                                 "outputs":[
                                     {
                                         "referenceName":"SinkDataset",
                                         "type":"DatasetReference",
                                         "parameters":{
                                             "SinkTableName":{
                                                 "value":"@{item().TABLE_NAME}",
                                                 "type":"Expression"
                                             }
                                         }
                                     }
                                 ]
                             },
                             {
                                 "name":"StoredProceduretoWriteWatermarkActivity",
                                 "type":"SqlServerStoredProcedure",
                                 "dependsOn":[
                                     {
                                         "activity":"IncrementalCopyActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "storedProcedureName":"[dbo].[usp_write_watermark]",
                                     "storedProcedureParameters":{
                                         "LastModifiedtime":{
                                             "value":{
                                                 "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
                                                 "type":"Expression"
                                             },
                                             "type":"DateTime"
                                         },
                                         "TableName":{
                                             "value":{
                                                 "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
                                                 "type":"Expression"
                                             },
                                             "type":"String"
                                         }
                                     }
                                 },
                                 "linkedServiceName":{
                                     "referenceName":"AzureSQLDatabaseLinkedService",
                                     "type":"LinkedServiceReference"
                                 }
                             }
                         ]
                     }
                 }
             ],
             "parameters":{
                 "tableList":{
                     "type":"array"
                 }
             },
             "annotations":[
    
             ]
         }
     }
    
  2. 執行 Set-AzDataFactoryV2Pipeline Cmdlet 來建立管線 IncrementalCopyPipeline。

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    以下是範例輸出:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Activities        : {IterateSQLTables}
     Parameters        : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
    

執行管線

  1. 在相同的資料夾中,使用下列內容建立名為 Parameters.json 的參數檔案:

     {
         "tableList":
         [
             {
                 "TABLE_NAME": "customer_table",
                 "WaterMark_Column": "LastModifytime",
                 "TableType": "DataTypeforCustomerTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
             },
             {
                 "TABLE_NAME": "project_table",
                 "WaterMark_Column": "Creationtime",
                 "TableType": "DataTypeforProjectTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
             }
         ]
     }
    
  2. 使用 Invoke-AzDataFactoryV2Pipeline Cmdlet 來執行管線 IncrementalCopyPipeline。 以您自己的資源群組和資料處理站名稱取代預留位置。

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    

監視管線

  1. 登入 Azure 入口網站

  2. 選取 [所有服務],以關鍵字「資料處理站」進行搜尋,然後選取 [資料處理站]

  3. 在 Data Factory 清單中搜尋您的 Data Factory,然後加以選取以開啟 [Data Factory] 頁面。

  4. 在 [資料處理站] 頁面上,選取 [開啟 Azure Data Factory Studio] 圖格上的 [開啟],以在個別的索引標籤中啟動 Azure Data Factory。

  5. 在 Azure Data Factory 首頁上,選取左側的 [監視]

    顯示 Azure Data Factory 首頁的螢幕快照。

  6. 您可以看到所有管線執行及其狀態。 請注意,在下列範例中,管線執行狀態是 [成功]。 若要檢查傳遞到管線的參數,請選取 [參數] 資料行中的連結。 如果發生錯誤,您就會在 [錯誤] 資料行中看到連結。

    此螢幕快照顯示數據處理站的管線執行,包括您的管線。

  7. 當您選取 [動作] 資料行中的連結時,您會看到管線的所有活動執行。

  8. 若要回到 [管線執行] 檢視,請選取 [所有管線執行]

檢閱結果

在 SQL Server Management Studio 中,對目標 SQL 資料庫執行下列查詢,以確認資料已從來源資料表複製到目的地資料表:

查詢

select * from customer_table

輸出

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            Alice    2017-09-03 02:36:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

查詢

select * from project_table

輸出

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000

查詢

select * from watermarktable

輸出

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-05 08:06:00.000
project_table    2017-03-04 05:16:00.000

請注意,這兩個資料表的水位線值已更新。

新增更多資料至來源資料表

對來源 SQL Server 資料庫執行下列查詢,以更新 customer_table 中的現有資料列。 在 project_table 中插入新的資料列。

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

重新執行管線

  1. 現在,執行下列 PowerShell 命令以重新執行管線:

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    
  2. 遵循監視管線一節中的指示,以監視管線執行。 當管線狀態為 [進行中] 時,您會在 [動作] 底下看到另一個動作連結,其可供取消管線執行。

  3. 選取 [重新整理] 可重新整理清單,直到管線執行成功。

  4. 選擇性地,選取 [動作] 底下的 [檢視活動執行] 連結,可查看與此管線執行相關聯的所有活動執行。

檢閱最終結果

在 SQL Server Management Studio 中,對目標資料庫執行下列查詢,以確認經過更新/全新的資料已從來源資料表複製到目的地資料表。

查詢

select * from customer_table

輸出

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            NewName    2017-09-08 00:00:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

請注意,PersonID 為 3 的 NameLastModifytime 的新值。

查詢

select * from project_table

輸出

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000
NewProject    2017-10-01 00:00:00.000

請注意,project_table 中已新增 NewProject 項目。

查詢

select * from watermarktable

輸出

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-08 00:00:00.000
project_table    2017-10-01 00:00:00.000

請注意,這兩個資料表的水位線值已更新。

在本教學課程中,您已執行下列步驟:

  • 準備來源和目的地資料存放區。
  • 建立資料處理站。
  • 建立自我裝載的整合執行階段 (IR)。
  • 安裝整合執行階段。
  • 建立連結的服務。
  • 建立來源、接收及水位線資料集。
  • 建立、執行及監視管線。
  • 檢閱結果。
  • 新增或更新來源資料表中的資料。
  • 重新執行,並監視管線。
  • 檢閱最終結果。

進入下列教學課程,以了解如何在 Azure 上使用 Spark 叢集來轉換資料: