Sdílet prostřednictvím


Přírůstkové načítání dat z více tabulek v SQL Serveru do Azure SQL Database pomocí PowerShellu

PLATÍ PRO: Azure Data Factory Azure Synapse Analytics

Tip

Vyzkoušejte si službu Data Factory v Microsoft Fabric, řešení pro analýzy typu all-in-one pro podniky. Microsoft Fabric zahrnuje všechno od přesunu dat až po datové vědy, analýzy v reálném čase, business intelligence a vytváření sestav. Přečtěte si, jak začít používat novou zkušební verzi zdarma.

V tomto kurzu vytvoříte službu Azure Data Factory s kanálem, který načte rozdílová data z více tabulek v databázi SQL Serveru do služby Azure SQL Database.

V tomto kurzu provedete následující kroky:

  • Příprava zdrojového a cílového datového úložiště
  • Vytvoření datové továrny
  • Vytvořte místní prostředí Integration Runtime.
  • Instalace prostředí Integration Runtime
  • Vytvoření propojených služeb
  • Vytvoření zdroje, jímky a datových sad mezí
  • Vytvoření a spuštění kanálu a jeho monitorování
  • Zkontrolujte výsledky.
  • Přidání nebo aktualizace dat ve zdrojových tabulkách
  • Opakované spuštění kanálu a jeho monitorování
  • Kontrola konečných výsledků

Přehled

Tady jsou důležité kroky pro vytvoření tohoto řešení:

  1. Vyberte sloupec meze.

    Vyberte jeden sloupec pro každou tabulku ve zdrojovém úložišti dat, které můžete identifikovat nové nebo aktualizované záznamy pro každé spuštění. Data v tomto vybraném sloupci (například čas_poslední_změny nebo ID) se při vytváření nebo aktualizaci řádků obvykle zvyšují. Maximální hodnota v tomto sloupci se používá jako horní mez.

  2. Připravte úložiště dat pro uložení hodnoty meze.

    V tomto kurzu uložíte hodnotu meze do databáze SQL.

  3. Vytvořte kanál s následujícími aktivitami:

    1. Vytvořte aktivitu ForEach, která prochází seznam názvů zdrojových tabulek, který je předaný kanálu jako parametr. Pro každou zdrojovou tabulku vyvolá následující aktivity, aby pro tabulku provedl rozdílové načtení.

    2. Vytvořte dvě aktivity vyhledávání. První aktivitu vyhledávání použijte k načtení poslední hodnoty meze. Druhou aktivitu vyhledávání použijte k načtení nové hodnoty meze. Tyto hodnoty meze se předají aktivitě kopírování.

    3. Vytvořte aktivita Copy, který kopíruje řádky ze zdrojového úložiště dat s hodnotou sloupce vodoznaku větší než stará hodnota vodoznaku a menší nebo rovna nové hodnotě vodoznaku. Potom tato rozdílová data zkopíruje ze zdrojového úložiště dat do úložiště Azure Blob Storage jako nový soubor.

    4. Vytvořte aktivitu uložené procedury StoredProcedure, která aktualizuje hodnotu meze pro příští spuštění kanálu.

    Tady je souhrnný diagram tohoto řešení:

    Přírůstkové načtení dat

Pokud ještě nemáte předplatné Azure, vytvořte si napřed bezplatný účet.

Požadavky

  • SQL Server. Jako zdrojové úložiště dat v tomto kurzu použijete databázi SQL Serveru.
  • Azure SQL Database Jako úložiště dat jímky použijete databázi ve službě Azure SQL Database. Pokud databázi SQL nemáte, přečtěte si téma Vytvoření databáze ve službě Azure SQL Database , kde najdete postup jeho vytvoření.

Vytvoření zdrojových tabulek v databázi SQL Serveru

  1. Otevřete SQL Server Management Studio (SSMS) nebo Azure Data Studio a připojte se k databázi SQL Serveru.

  2. V Průzkumníku serveru (SSMS) nebo v podokně Připojení (Azure Data Studio) klikněte pravým tlačítkem myši na databázi a zvolte Nový dotaz.

  3. Spusťte na databázi následující příkaz SQL, aby se vytvořily tabulky s názvem customer_table a project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    
     INSERT INTO customer_table
     (PersonID, Name, LastModifytime)
     VALUES
     (1, 'John','9/1/2017 12:56:00 AM'),
     (2, 'Mike','9/2/2017 5:23:00 AM'),
     (3, 'Alice','9/3/2017 2:36:00 AM'),
     (4, 'Andy','9/4/2017 3:21:00 AM'),
     (5, 'Anny','9/5/2017 8:06:00 AM');
    
     INSERT INTO project_table
     (Project, Creationtime)
     VALUES
     ('project1','1/1/2015 0:00:00 AM'),
     ('project2','2/2/2016 1:23:00 AM'),
     ('project3','3/4/2017 5:16:00 AM');
    

Vytvoření cílových tabulek ve službě Azure SQL Database

  1. Otevřete SQL Server Management Studio (SSMS) nebo Azure Data Studio a připojte se k databázi SQL Serveru.

  2. V Průzkumníku serveru (SSMS) nebo v podokně Připojení (Azure Data Studio) klikněte pravým tlačítkem myši na databázi a zvolte Nový dotaz.

  3. Spusťte na databázi následující příkaz SQL, aby se vytvořily tabulky s názvem customer_table a project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    

Vytvoření další tabulky ve službě Azure SQL Database pro uložení hodnoty horní meze

  1. Spuštěním následujícího příkazu SQL pro vaši databázi vytvořte tabulku s názvem watermarktable pro uložení hodnoty meze:

     create table watermarktable
     (
    
         TableName varchar(255),
         WatermarkValue datetime,
     );
    
  2. Do tabulky mezí vložte hodnoty počátečních mezí pro obě zdrojové tabulky.

     INSERT INTO watermarktable
     VALUES
     ('customer_table','1/1/2010 12:00:00 AM'),
     ('project_table','1/1/2010 12:00:00 AM');
    

Vytvoření uložené procedury ve službě Azure SQL Database

Spuštěním následujícího příkazu vytvořte uloženou proceduru v databázi. Tato uložená procedura aktualizuje hodnotu meze po každém spuštění kanálu.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Vytvoření datových typů a dalších uložených procedur ve službě Azure SQL Database

Spuštěním následujícího dotazu vytvořte ve své databázi dvě uložené procedury a dva datové typy. Slouží ke slučování dat ze zdrojových tabulek do cílových tabulek.

Abychom mohli snadno začít, použijeme tyto uložené procedury, které předávají rozdílová data prostřednictvím proměnné tabulky, a pak je sloučíme do cílového úložiště. Buďte opatrní, že neočekává, že se v proměnné tabulky uloží "velký" počet rozdílových řádků (více než 100).

Pokud potřebujete sloučit velký počet rozdílových řádků do cílového úložiště, doporučujeme použít aktivitu kopírování ke zkopírování všech rozdílových dat do dočasné "přípravné" tabulky v cílovém úložišti a pak vytvořit vlastní uloženou proceduru bez použití proměnné tabulky ke sloučení z "přípravné" tabulky do "konečné" tabulky.

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Azure PowerShell

Nainstalujte nejnovější moduly Azure PowerShellu podle pokynů v tématu Instalace a konfigurace Azure PowerShellu.

Vytvoření datové továrny

  1. Definujte proměnnou pro název skupiny prostředků, kterou použijete později v příkazech PowerShellu. Zkopírujte do PowerShellu následující text příkazu, zadejte název skupiny prostředků Azure v uvozovkách a pak příkaz spusťte. Příklad: "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Pokud již skupina prostředků existuje, nepřepisujte ji. Přiřaďte proměnné $resourceGroupName jinou hodnotu a spusťte tento příkaz znovu.

  2. Definujte proměnnou pro umístění datové továrny.

    $location = "East US"
    
  3. Pokud chcete vytvořit skupinu prostředků Azure, spusťte následující příkaz:

    New-AzResourceGroup $resourceGroupName $location
    

    Pokud již skupina prostředků existuje, nepřepisujte ji. Přiřaďte proměnné $resourceGroupName jinou hodnotu a spusťte tento příkaz znovu.

  4. Definujte proměnnou název datové továrny.

    Důležité

    Aktualizujte název datové továrny tak, aby byl globálně jedinečný. Příklad: ADFIncMultiCopyTutorialFactorySP1127.

    $dataFactoryName = "ADFIncMultiCopyTutorialFactory";
    
  5. Pokud chcete vytvořit datovou továrnu, spusťte následující rutinu Set-AzDataFactoryV2 :

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

Mějte na paměti následující body:

  • Název datové továrny musí být globálně jedinečný. Pokud se zobrazí následující chyba, změňte název a zkuste to znovu:

    Set-AzDataFactoryV2 : HTTP Status Code: Conflict
    Error Code: DataFactoryNameInUse
    Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
    
  • Pro vytvoření instancí služby Data Factory musí být uživatelský účet, který použijete pro přihlášení k Azure, členem rolí přispěvatel nebo vlastník nebo správcem předplatného Azure.

  • Pokud chcete zobrazit seznam oblastí Azure, ve kterých je služba Data Factory aktuálně dostupná, na následující stránce vyberte oblasti, které vás zajímají, pak rozbalte Analýza a vyhledejte Data Factory:Dostupné produkty v jednotlivých oblastech. Úložiště dat (Azure Storage, SQL Database, SQL Managed Instance atd.) a výpočetní prostředí (Azure HDInsight atd.) používané datovou továrnou můžou být v jiných oblastech.

Vytvoření místního prostředí Integration Runtime

V této části vytvoříte místní prostředí Integration Runtime a přidružíte ho k místnímu počítači s databází SQL Serveru. Místní prostředí Integration Runtime je komponenta, která kopíruje data z SQL Serveru na vašem počítači do služby Azure SQL Database.

  1. Vytvořte proměnnou pro název prostředí Integration Runtime. Použijte jedinečný název a poznamenejte si ho. Použijete ho později v tomto kurzu.

    $integrationRuntimeName = "ADFTutorialIR"
    
  2. Vytvořte místní prostředí Integration Runtime.

    Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Tady je ukázkový výstup:

     Name              : <Integration Runtime name>
     Type              : SelfHosted
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Description       : 
     Id                : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
    
  3. Pokud chcete načíst stav vytvořeného prostředí Integration Runtime, spusťte následující příkaz. Potvrďte, že hodnota vlastnosti State je nastavena na NeedRegistration.

    Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
    

    Tady je ukázkový výstup:

    State                     : NeedRegistration
    Version                   : 
    CreateTime                : 9/24/2019 6:00:00 AM
    AutoUpdate                : On
    ScheduledUpdateDate       : 
    UpdateDelayOffset         : 
    LocalTimeZoneOffset       : 
    InternalChannelEncryption : 
    Capabilities              : {}
    ServiceUrls               : {eu.frontend.clouddatahub.net}
    Nodes                     : {}
    Links                     : {}
    Name                      : ADFTutorialIR
    Type                      : SelfHosted
    ResourceGroupName         : <ResourceGroup name>
    DataFactoryName           : <DataFactory name>
    Description               : 
    Id                        : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
    
  4. Spuštěním následujícího příkazu načtěte ověřovací klíče pro registraci místního prostředí Integration Runtime ve službě Azure Data Factory v cloudu:

    Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
    

    Tady je ukázkový výstup:

    {
     "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=",
     "AuthKey2":  "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy="
    }
    
  5. Pro registraci místního prostředí Integration Runtime, které nainstalujete na počítači v dalších krocích, zkopírujte jeden z klíčů (bez uvozovek).

Instalace nástroje Integration Runtime

  1. Pokud již na počítači máte prostředí Integration Runtime, odinstalujte ho pomocí panelu Přidat nebo odebrat programy.

  2. Na místním počítači s Windows stáhněte místní prostředí Integration Runtime. Spusťte instalaci.

  3. Na stránce Vítá vás instalace prostředí Microsoft Integration Runtime vyberte Další.

  4. Na stránce Licenční smlouva s koncovým uživatelem (EULA) přijměte podmínky a licenční smlouvu a vyberte Další.

  5. Na stránce Cílová složka vyberte Další.

  6. Na stránce Připraveno k instalaci prostředí Microsoft Integration Runtime vyberte Nainstalovat.

  7. Na stránce Dokončení instalace prostředí Microsoft Integration Runtime vyberte Dokončit.

  8. Na stránce Registrace prostředí Integration Runtime (v místním prostředí) vložte klíč, který jste uložili v předchozí části, a vyberte Zaregistrovat.

    Registrace prostředí Integration Runtime

  9. Na stránce Nový Integration Runtime (v místním prostředí) Uzel vyberte Dokončit.

  10. Po úspěšném dokončení registrace místního prostředí Integration Runtime se zobrazí následující zpráva:

    Úspěšně zaregistrováno

  11. Na stránce Registrace prostředí Integration Runtime (v místním prostředí) vyberte Spustit Správce konfigurace.

  12. Jakmile se uzel připojí ke cloudové službě, zobrazí se následující stránka:

    Stránka Uzel je připojen

  13. Teď otestujte připojení k databázi SQL Serveru.

    Karta Diagnostika

    a. Na stránce Správce konfigurace přejděte na kartu Diagnostika.

    b. Jako typ zdroje dat vyberte SqlServer.

    c. Zadejte název serveru.

    d. Zadejte název databáze.

    e. Vyberte režim ověřování.

    f. Zadejte uživatelské jméno.

    g. Zadejte heslo přidružené k uživatelskému jménu.

    h. Pokud chcete potvrdit, že se prostředí Integration Runtime může připojit k SQL Serveru, vyberte Test. Pokud je připojení úspěšné, zobrazí se zelená značka zaškrtnutí. Jestliže připojení není úspěšné, zobrazí se chybová zpráva. Opravte všechny problémy a ověřte, že se prostředí Integration Runtime může připojit k SQL Serveru.

    Poznámka:

    Poznamenejte si hodnoty pro typ ověřování, server, databázi, uživatele a heslo. Použijete je později v tomto kurzu.

Vytvoření propojených služeb

V datové továrně vytvoříte propojené služby, abyste svá úložiště dat a výpočetní služby spojili s datovou továrnou. V této části vytvoříte propojené služby s databází SQL Serveru a databází ve službě Azure SQL Database.

Vytvoření propojené služby SQL Serveru

V tomto kroku propočítáte databázi SQL Serveru s datovnou továrnou.

  1. Vytvořte soubor JSON s názvem SqlServerLinkedService.json ve složce C:\ADFTutorials\IncCopyMultiTableTutorial (vytvořte místní složky, pokud ještě neexistují) s následujícím obsahem. Vyberte správnou část na základě ověřování, které požíváte pro připojení k SQL Serveru.

    Důležité

    Vyberte správnou část na základě ověřování, které požíváte pro připojení k SQL Serveru.

    Pokud používáte ověřování SQL, zkopírujte následující definici JSON:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>"
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Pokud používáte ověřování Windows, zkopírujte následující definici JSON:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>",
                 "userName":"<username> or <domain>\\<username>",
                 "password":{
                     "type":"SecureString",
                     "value":"<password>"
                 }
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Důležité

    • Vyberte správnou část na základě ověřování, které požíváte pro připojení k SQL Serveru.
    • Nahraďte <název> prostředí Integration Runtime názvem vašeho prostředí Integration Runtime.
    • Před uložením souboru nahraďte <název> serveru, <název databáze>, <uživatelské jméno> a <heslo> hodnotami databáze SQL Serveru.
    • Pokud v názvu uživatelského účtu nebo serveru potřebujete použít znak lomítko (\), použijte řídicí znak (\). Příklad: mydomain\\myuser.
  2. V PowerShellu spusťte následující rutinu, která přepne do složky C:\ADFTutorials\IncCopyMultiTableTutorial.

    Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
    
  3. Spuštěním rutiny Set-AzDataFactoryV2LinkedService vytvořte propojenou službu AzureStorageLinkedService. V následujícím příkladu předáte hodnoty pro parametry ResourceGroupName a DataFactoryName:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
    

    Tady je ukázkový výstup:

    LinkedServiceName : SqlServerLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
    

Vytvoření propojené služby SQL Database

  1. Ve složce C:\ADFTutorials\IncCopyMultiTableTutorial vytvořte soubor JSON s názvem AzureSQLDatabaseLinkedService.json s následujícím obsahem. (Pokud složka ADF ještě neexistuje, vytvořte ji.) Před uložením souboru nahraďte <název> serveru, <název> databáze, <uživatelské jméno> a <heslo> názvem databáze SQL Serveru, názvem databáze, uživatelským jménem a heslem.

     {
         "name":"AzureSQLDatabaseLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"AzureSqlDatabase",
             "typeProperties":{
                 "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;"
             }
         }
     }
    
  2. V PowerShellu spusťte rutinu Set-AzDataFactoryV2LinkedService a vytvořte propojenou službu AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Tady je ukázkový výstup:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

Vytvoření datových sad

V tomto kroku vytvoříte datové sady, které představují zdroj dat, cíl dat a místo pro uložení hodnoty meze.

Vytvoření zdrojové datové sady

  1. Ve stejné složce vytvořte soubor JSON s názvem SourceDataset.json a s následujícím obsahem:

    {
         "name":"SourceDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"SqlServerLinkedService",
                 "type":"LinkedServiceReference"
             },
             "annotations":[
    
             ],
             "type":"SqlServerTable",
             "schema":[
    
             ]
         }
    }
    

    Aktivita kopírování v kanálu používá místo načtení celé tabulky dotaz SQL pro načtení dat.

  2. Spuštěním rutiny Set-AzDataFactoryV2Dataset vytvořte datovou sadu SourceDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Tady je ukázkový výstup této rutiny:

    DatasetName       : SourceDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
    

Vytvoření datové sady jímky

  1. Ve stejné složce vytvořte soubor JSON s názvem SinkDataset.json s následujícím obsahem. Element tableName je nastaven kanálem dynamicky za běhu. Aktivita ForEach v kanálu prochází seznam názvů tabulek a při každé iteraci předává název tabulky této datové sadě.

     {
         "name":"SinkDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"AzureSQLDatabaseLinkedService",
                 "type":"LinkedServiceReference"
             },
             "parameters":{
                 "SinkTableName":{
                     "type":"String"
                 }
             },
             "annotations":[
    
             ],
             "type":"AzureSqlTable",
             "typeProperties":{
                 "tableName":{
                     "value":"@dataset().SinkTableName",
                     "type":"Expression"
                 }
             }
         }
     }
    
  2. Spuštěním rutiny Set-AzDataFactoryV2Dataset vytvořte datovou sadu SinkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Tady je ukázkový výstup této rutiny:

    DatasetName       : SinkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Vytvoření datové sady pro mez

V tomto kroku vytvoříte datovou sadu pro uložení hodnoty horní meze.

  1. Ve stejné složce vytvořte soubor JSON s názvem WatermarkDataset.json s následujícím obsahem:

     {
         "name": " WatermarkDataset ",
         "properties": {
             "type": "AzureSqlTable",
             "typeProperties": {
                 "tableName": "watermarktable"
             },
             "linkedServiceName": {
                 "referenceName": "AzureSQLDatabaseLinkedService",
                 "type": "LinkedServiceReference"
             }
         }
     }
    
  2. Spuštěním rutiny Set-AzDataFactoryV2Dataset vytvořte datovou sadu WatermarkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Tady je ukázkový výstup této rutiny:

    DatasetName       : WatermarkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Vytvořit kanál

Tento kanál dostává jako parametr seznam tabulek. Aktivita ForEach prochází seznam názvů tabulek a provádí následující operace:

  1. Pomocí aktivity Vyhledávání můžete načíst starou hodnotu meze (počáteční hodnotu nebo hodnotu použitou v poslední iteraci).

  2. Pomocí aktivity Vyhledávání můžete načíst novou hodnotu meze (maximální hodnota sloupce vodoznaku ve zdrojové tabulce).

  3. Pomocí aktivita Copy zkopírujte data mezi těmito dvěma hodnotami meze ze zdrojové databáze do cílové databáze.

  4. Pomocí aktivity StoredProcedure aktualizujte starou hodnotu meze, která se má použít v prvním kroku další iterace.

Vytvoření kanálu

  1. Ve stejné složce vytvořte soubor JSON s názvem IncrementalCopyPipeline.json s následujícím obsahem:

     {
         "name":"IncrementalCopyPipeline",
         "properties":{
             "activities":[
                 {
                     "name":"IterateSQLTables",
                     "type":"ForEach",
                     "dependsOn":[
    
                     ],
                     "userProperties":[
    
                     ],
                     "typeProperties":{
                         "items":{
                             "value":"@pipeline().parameters.tableList",
                             "type":"Expression"
                         },
                         "isSequential":false,
                         "activities":[
                             {
                                 "name":"LookupOldWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"AzureSqlSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"WatermarkDataset",
                                         "type":"DatasetReference"
                                     }
                                 }
                             },
                             {
                                 "name":"LookupNewWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     },
                                     "firstRowOnly":true
                                 }
                             },
                             {
                                 "name":"IncrementalCopyActivity",
                                 "type":"Copy",
                                 "dependsOn":[
                                     {
                                         "activity":"LookupOldWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     },
                                     {
                                         "activity":"LookupNewWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "sink":{
                                         "type":"AzureSqlSink",
                                         "sqlWriterStoredProcedureName":{
                                             "value":"@{item().StoredProcedureNameForMergeOperation}",
                                             "type":"Expression"
                                         },
                                         "sqlWriterTableType":{
                                             "value":"@{item().TableType}",
                                             "type":"Expression"
                                         },
                                         "storedProcedureTableTypeParameterName":{
                                             "value":"@{item().TABLE_NAME}",
                                             "type":"Expression"
                                         },
                                         "disableMetricsCollection":false
                                     },
                                     "enableStaging":false
                                 },
                                 "inputs":[
                                     {
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     }
                                 ],
                                 "outputs":[
                                     {
                                         "referenceName":"SinkDataset",
                                         "type":"DatasetReference",
                                         "parameters":{
                                             "SinkTableName":{
                                                 "value":"@{item().TABLE_NAME}",
                                                 "type":"Expression"
                                             }
                                         }
                                     }
                                 ]
                             },
                             {
                                 "name":"StoredProceduretoWriteWatermarkActivity",
                                 "type":"SqlServerStoredProcedure",
                                 "dependsOn":[
                                     {
                                         "activity":"IncrementalCopyActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "storedProcedureName":"[dbo].[usp_write_watermark]",
                                     "storedProcedureParameters":{
                                         "LastModifiedtime":{
                                             "value":{
                                                 "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
                                                 "type":"Expression"
                                             },
                                             "type":"DateTime"
                                         },
                                         "TableName":{
                                             "value":{
                                                 "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
                                                 "type":"Expression"
                                             },
                                             "type":"String"
                                         }
                                     }
                                 },
                                 "linkedServiceName":{
                                     "referenceName":"AzureSQLDatabaseLinkedService",
                                     "type":"LinkedServiceReference"
                                 }
                             }
                         ]
                     }
                 }
             ],
             "parameters":{
                 "tableList":{
                     "type":"array"
                 }
             },
             "annotations":[
    
             ]
         }
     }
    
  2. Spuštěním rutiny Set-AzDataFactoryV2Pipeline vytvořte kanál IncrementalCopyPipeline.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Tady je ukázkový výstup:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Activities        : {IterateSQLTables}
     Parameters        : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
    

Spuštění kanálu

  1. Ve stejné složce vytvořte soubor parametrů s názvem Parameters.json s následujícím obsahem:

     {
         "tableList":
         [
             {
                 "TABLE_NAME": "customer_table",
                 "WaterMark_Column": "LastModifytime",
                 "TableType": "DataTypeforCustomerTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
             },
             {
                 "TABLE_NAME": "project_table",
                 "WaterMark_Column": "Creationtime",
                 "TableType": "DataTypeforProjectTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
             }
         ]
     }
    
  2. Spusťte kanál IncrementalCopyPipeline pomocí rutiny Invoke-AzDataFactoryV2Pipeline . Zástupné znaky nahraďte vlastním názvem skupiny prostředků a názvem datové továrny.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    

Monitorování kanálu

  1. Přihlaste se k portálu Azure.

  2. Vyberte Všechny služby, spusťte hledání pomocí klíčového slova Datové továrny a vyberte Datové továrny.

  3. V seznamu datových továren vyhledejte vaši datovou továrnu a vyberte ji. Otevře se stránka Datová továrna.

  4. Na stránce Datová továrna vyberte Otevřít na dlaždici Otevřít Azure Data Factory Studio a spusťte Azure Data Factory na samostatné kartě.

  5. Na domovské stránce služby Azure Data Factory vyberte Na levé straně možnost Monitorování .

    Snímek obrazovky znázorňující domovskou stránku služby Azure Data Factory

  6. Zobrazí se všechna spuštění kanálů a jejich stavy. Všimněte si, že stav spuštění kanálu v následujícím příkladu je Úspěšně. Parametry předané kanálu můžete zkontrolovat kliknutím na odkaz ve sloupci Parametry. Pokud došlo k chybě, uvidíte odkaz ve sloupci Chyba.

    Snímek obrazovky znázorňující spuštění kanálu pro datovou továrnu, včetně vašeho kanálu

  7. Když ve sloupci Akce vyberete odkaz, zobrazí se všechna spuštění aktivit kanálu.

  8. Pokud se chcete vrátit do zobrazení Spuštění kanálu, vyberte Všechna spuštění kanálu.

Kontrola výsledků

V SQL Server Management Studiu spusťte následující dotazy na cílovou databázi SQL a ověřte, že data byla ze zdrojových tabulek zkopírována do cílových tabulek:

Dotaz

select * from customer_table

Výstup

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            Alice    2017-09-03 02:36:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Dotaz

select * from project_table

Výstup

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000

Dotaz

select * from watermarktable

Výstup

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-05 08:06:00.000
project_table    2017-03-04 05:16:00.000

Všimněte si, že hodnoty mezí pro obě tabulky byly aktualizovány.

Přidání dalších dat do zdrojových tabulek

Spusťte následující dotaz na zdrojovou databázi SQL Serveru, aby se aktualizoval stávající řádek v tabulce customer_table. Vložte nový řádek do tabulky project_table.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

Opětovné spuštění kanálu

  1. Nyní spusťte znovu kanálu provedením následujícího příkazu Powershellu:

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    
  2. Monitorujte spuštění kanálu podle pokynů v části Monitorování kanálu. Pokud je stav kanálu probíhající, zobrazí se v části Akce další odkaz na akci, která zruší spuštění kanálu.

  3. Kliknutím na Aktualizovat můžete aktualizovat seznam, dokud nebude spuštění kanálu úspěšné.

  4. Volitelně můžete v části Akce vybrat odkaz Zobrazit spuštění aktivit a zobrazit si všechna spuštění aktivit související s tímto spuštěním kanálu.

Kontrola konečných výsledků

V SQL Server Management Studiu spusťte následující dotazy na cílovou databázi a ověřte, že aktualizovaná/nová data byla ze zdrojových tabulek zkopírována do cílových tabulek.

Dotaz

select * from customer_table

Výstup

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            NewName    2017-09-08 00:00:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Všimněte si nových hodnot položek Name a LastModifytime pro PersonID pro číslo 3.

Dotaz

select * from project_table

Výstup

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000
NewProject    2017-10-01 00:00:00.000

Všimněte si, že do tabulky project_table byla přidána položka NewProject.

Dotaz

select * from watermarktable

Výstup

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-08 00:00:00.000
project_table    2017-10-01 00:00:00.000

Všimněte si, že hodnoty mezí pro obě tabulky byly aktualizovány.

V tomto kurzu jste provedli následující kroky:

  • Příprava zdrojového a cílového datového úložiště
  • Vytvoření datové továrny
  • Vytvoření místního prostředí Integration Runtime (IR)
  • Instalace prostředí Integration Runtime
  • Vytvoření propojených služeb
  • Vytvoření zdroje, jímky a datových sad mezí
  • Vytvoření a spuštění kanálu a jeho monitorování
  • Zkontrolujte výsledky.
  • Přidání nebo aktualizace dat ve zdrojových tabulkách
  • Opakované spuštění kanálu a jeho monitorování
  • Kontrola konečných výsledků

Pokud se chcete dozvědět víc o transformaci dat pomocí clusteru Spark v Azure, přejděte k následujícímu kurzu: