Přírůstkové načítání dat z více tabulek v SQL Serveru do Azure SQL Database pomocí PowerShellu
PLATÍ PRO: Azure Data Factory Azure Synapse Analytics
Tip
Vyzkoušejte si službu Data Factory v Microsoft Fabric, řešení pro analýzy typu all-in-one pro podniky. Microsoft Fabric zahrnuje všechno od přesunu dat až po datové vědy, analýzy v reálném čase, business intelligence a vytváření sestav. Přečtěte si, jak začít používat novou zkušební verzi zdarma.
V tomto kurzu vytvoříte službu Azure Data Factory s kanálem, který načte rozdílová data z více tabulek v databázi SQL Serveru do služby Azure SQL Database.
V tomto kurzu provedete následující kroky:
- Příprava zdrojového a cílového datového úložiště
- Vytvoření datové továrny
- Vytvořte místní prostředí Integration Runtime.
- Instalace prostředí Integration Runtime
- Vytvoření propojených služeb
- Vytvoření zdroje, jímky a datových sad mezí
- Vytvoření a spuštění kanálu a jeho monitorování
- Zkontrolujte výsledky.
- Přidání nebo aktualizace dat ve zdrojových tabulkách
- Opakované spuštění kanálu a jeho monitorování
- Kontrola konečných výsledků
Přehled
Tady jsou důležité kroky pro vytvoření tohoto řešení:
Vyberte sloupec meze.
Vyberte jeden sloupec pro každou tabulku ve zdrojovém úložišti dat, které můžete identifikovat nové nebo aktualizované záznamy pro každé spuštění. Data v tomto vybraném sloupci (například čas_poslední_změny nebo ID) se při vytváření nebo aktualizaci řádků obvykle zvyšují. Maximální hodnota v tomto sloupci se používá jako horní mez.
Připravte úložiště dat pro uložení hodnoty meze.
V tomto kurzu uložíte hodnotu meze do databáze SQL.
Vytvořte kanál s následujícími aktivitami:
Vytvořte aktivitu ForEach, která prochází seznam názvů zdrojových tabulek, který je předaný kanálu jako parametr. Pro každou zdrojovou tabulku vyvolá následující aktivity, aby pro tabulku provedl rozdílové načtení.
Vytvořte dvě aktivity vyhledávání. První aktivitu vyhledávání použijte k načtení poslední hodnoty meze. Druhou aktivitu vyhledávání použijte k načtení nové hodnoty meze. Tyto hodnoty meze se předají aktivitě kopírování.
Vytvořte aktivita Copy, který kopíruje řádky ze zdrojového úložiště dat s hodnotou sloupce vodoznaku větší než stará hodnota vodoznaku a menší nebo rovna nové hodnotě vodoznaku. Potom tato rozdílová data zkopíruje ze zdrojového úložiště dat do úložiště Azure Blob Storage jako nový soubor.
Vytvořte aktivitu uložené procedury StoredProcedure, která aktualizuje hodnotu meze pro příští spuštění kanálu.
Tady je souhrnný diagram tohoto řešení:
Pokud ještě nemáte předplatné Azure, vytvořte si napřed bezplatný účet.
Požadavky
- SQL Server. Jako zdrojové úložiště dat v tomto kurzu použijete databázi SQL Serveru.
- Azure SQL Database Jako úložiště dat jímky použijete databázi ve službě Azure SQL Database. Pokud databázi SQL nemáte, přečtěte si téma Vytvoření databáze ve službě Azure SQL Database , kde najdete postup jeho vytvoření.
Vytvoření zdrojových tabulek v databázi SQL Serveru
Otevřete SQL Server Management Studio (SSMS) nebo Azure Data Studio a připojte se k databázi SQL Serveru.
V Průzkumníku serveru (SSMS) nebo v podokně Připojení (Azure Data Studio) klikněte pravým tlačítkem myši na databázi a zvolte Nový dotaz.
Spusťte na databázi následující příkaz SQL, aby se vytvořily tabulky s názvem
customer_table
aproject_table
:create table customer_table ( PersonID int, Name varchar(255), LastModifytime datetime ); create table project_table ( Project varchar(255), Creationtime datetime ); INSERT INTO customer_table (PersonID, Name, LastModifytime) VALUES (1, 'John','9/1/2017 12:56:00 AM'), (2, 'Mike','9/2/2017 5:23:00 AM'), (3, 'Alice','9/3/2017 2:36:00 AM'), (4, 'Andy','9/4/2017 3:21:00 AM'), (5, 'Anny','9/5/2017 8:06:00 AM'); INSERT INTO project_table (Project, Creationtime) VALUES ('project1','1/1/2015 0:00:00 AM'), ('project2','2/2/2016 1:23:00 AM'), ('project3','3/4/2017 5:16:00 AM');
Vytvoření cílových tabulek ve službě Azure SQL Database
Otevřete SQL Server Management Studio (SSMS) nebo Azure Data Studio a připojte se k databázi SQL Serveru.
V Průzkumníku serveru (SSMS) nebo v podokně Připojení (Azure Data Studio) klikněte pravým tlačítkem myši na databázi a zvolte Nový dotaz.
Spusťte na databázi následující příkaz SQL, aby se vytvořily tabulky s názvem
customer_table
aproject_table
:create table customer_table ( PersonID int, Name varchar(255), LastModifytime datetime ); create table project_table ( Project varchar(255), Creationtime datetime );
Vytvoření další tabulky ve službě Azure SQL Database pro uložení hodnoty horní meze
Spuštěním následujícího příkazu SQL pro vaši databázi vytvořte tabulku s názvem
watermarktable
pro uložení hodnoty meze:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );
Do tabulky mezí vložte hodnoty počátečních mezí pro obě zdrojové tabulky.
INSERT INTO watermarktable VALUES ('customer_table','1/1/2010 12:00:00 AM'), ('project_table','1/1/2010 12:00:00 AM');
Vytvoření uložené procedury ve službě Azure SQL Database
Spuštěním následujícího příkazu vytvořte uloženou proceduru v databázi. Tato uložená procedura aktualizuje hodnotu meze po každém spuštění kanálu.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Vytvoření datových typů a dalších uložených procedur ve službě Azure SQL Database
Spuštěním následujícího dotazu vytvořte ve své databázi dvě uložené procedury a dva datové typy. Slouží ke slučování dat ze zdrojových tabulek do cílových tabulek.
Abychom mohli snadno začít, použijeme tyto uložené procedury, které předávají rozdílová data prostřednictvím proměnné tabulky, a pak je sloučíme do cílového úložiště. Buďte opatrní, že neočekává, že se v proměnné tabulky uloží "velký" počet rozdílových řádků (více než 100).
Pokud potřebujete sloučit velký počet rozdílových řádků do cílového úložiště, doporučujeme použít aktivitu kopírování ke zkopírování všech rozdílových dat do dočasné "přípravné" tabulky v cílovém úložišti a pak vytvořit vlastní uloženou proceduru bez použití proměnné tabulky ke sloučení z "přípravné" tabulky do "konečné" tabulky.
CREATE TYPE DataTypeforCustomerTable AS TABLE(
PersonID int,
Name varchar(255),
LastModifytime datetime
);
GO
CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS
BEGIN
MERGE customer_table AS target
USING @customer_table AS source
ON (target.PersonID = source.PersonID)
WHEN MATCHED THEN
UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
WHEN NOT MATCHED THEN
INSERT (PersonID, Name, LastModifytime)
VALUES (source.PersonID, source.Name, source.LastModifytime);
END
GO
CREATE TYPE DataTypeforProjectTable AS TABLE(
Project varchar(255),
Creationtime datetime
);
GO
CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS
BEGIN
MERGE project_table AS target
USING @project_table AS source
ON (target.Project = source.Project)
WHEN MATCHED THEN
UPDATE SET Creationtime = source.Creationtime
WHEN NOT MATCHED THEN
INSERT (Project, Creationtime)
VALUES (source.Project, source.Creationtime);
END
Azure PowerShell
Nainstalujte nejnovější moduly Azure PowerShellu podle pokynů v tématu Instalace a konfigurace Azure PowerShellu.
Vytvoření datové továrny
Definujte proměnnou pro název skupiny prostředků, kterou použijete později v příkazech PowerShellu. Zkopírujte do PowerShellu následující text příkazu, zadejte název skupiny prostředků Azure v uvozovkách a pak příkaz spusťte. Příklad:
"adfrg"
.$resourceGroupName = "ADFTutorialResourceGroup";
Pokud již skupina prostředků existuje, nepřepisujte ji. Přiřaďte proměnné
$resourceGroupName
jinou hodnotu a spusťte tento příkaz znovu.Definujte proměnnou pro umístění datové továrny.
$location = "East US"
Pokud chcete vytvořit skupinu prostředků Azure, spusťte následující příkaz:
New-AzResourceGroup $resourceGroupName $location
Pokud již skupina prostředků existuje, nepřepisujte ji. Přiřaďte proměnné
$resourceGroupName
jinou hodnotu a spusťte tento příkaz znovu.Definujte proměnnou název datové továrny.
Důležité
Aktualizujte název datové továrny tak, aby byl globálně jedinečný. Příklad: ADFIncMultiCopyTutorialFactorySP1127.
$dataFactoryName = "ADFIncMultiCopyTutorialFactory";
Pokud chcete vytvořit datovou továrnu, spusťte následující rutinu Set-AzDataFactoryV2 :
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
Mějte na paměti následující body:
Název datové továrny musí být globálně jedinečný. Pokud se zobrazí následující chyba, změňte název a zkuste to znovu:
Set-AzDataFactoryV2 : HTTP Status Code: Conflict Error Code: DataFactoryNameInUse Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
Pro vytvoření instancí služby Data Factory musí být uživatelský účet, který použijete pro přihlášení k Azure, členem rolí přispěvatel nebo vlastník nebo správcem předplatného Azure.
Pokud chcete zobrazit seznam oblastí Azure, ve kterých je služba Data Factory aktuálně dostupná, na následující stránce vyberte oblasti, které vás zajímají, pak rozbalte Analýza a vyhledejte Data Factory:Dostupné produkty v jednotlivých oblastech. Úložiště dat (Azure Storage, SQL Database, SQL Managed Instance atd.) a výpočetní prostředí (Azure HDInsight atd.) používané datovou továrnou můžou být v jiných oblastech.
Vytvoření místního prostředí Integration Runtime
V této části vytvoříte místní prostředí Integration Runtime a přidružíte ho k místnímu počítači s databází SQL Serveru. Místní prostředí Integration Runtime je komponenta, která kopíruje data z SQL Serveru na vašem počítači do služby Azure SQL Database.
Vytvořte proměnnou pro název prostředí Integration Runtime. Použijte jedinečný název a poznamenejte si ho. Použijete ho později v tomto kurzu.
$integrationRuntimeName = "ADFTutorialIR"
Vytvořte místní prostředí Integration Runtime.
Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
Tady je ukázkový výstup:
Name : <Integration Runtime name> Type : SelfHosted ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Description : Id : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
Pokud chcete načíst stav vytvořeného prostředí Integration Runtime, spusťte následující příkaz. Potvrďte, že hodnota vlastnosti State je nastavena na NeedRegistration.
Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
Tady je ukázkový výstup:
State : NeedRegistration Version : CreateTime : 9/24/2019 6:00:00 AM AutoUpdate : On ScheduledUpdateDate : UpdateDelayOffset : LocalTimeZoneOffset : InternalChannelEncryption : Capabilities : {} ServiceUrls : {eu.frontend.clouddatahub.net} Nodes : {} Links : {} Name : ADFTutorialIR Type : SelfHosted ResourceGroupName : <ResourceGroup name> DataFactoryName : <DataFactory name> Description : Id : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
Spuštěním následujícího příkazu načtěte ověřovací klíče pro registraci místního prostředí Integration Runtime ve službě Azure Data Factory v cloudu:
Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
Tady je ukázkový výstup:
{ "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=", "AuthKey2": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy=" }
Pro registraci místního prostředí Integration Runtime, které nainstalujete na počítači v dalších krocích, zkopírujte jeden z klíčů (bez uvozovek).
Instalace nástroje Integration Runtime
Pokud již na počítači máte prostředí Integration Runtime, odinstalujte ho pomocí panelu Přidat nebo odebrat programy.
Na místním počítači s Windows stáhněte místní prostředí Integration Runtime. Spusťte instalaci.
Na stránce Vítá vás instalace prostředí Microsoft Integration Runtime vyberte Další.
Na stránce Licenční smlouva s koncovým uživatelem (EULA) přijměte podmínky a licenční smlouvu a vyberte Další.
Na stránce Cílová složka vyberte Další.
Na stránce Připraveno k instalaci prostředí Microsoft Integration Runtime vyberte Nainstalovat.
Na stránce Dokončení instalace prostředí Microsoft Integration Runtime vyberte Dokončit.
Na stránce Registrace prostředí Integration Runtime (v místním prostředí) vložte klíč, který jste uložili v předchozí části, a vyberte Zaregistrovat.
Na stránce Nový Integration Runtime (v místním prostředí) Uzel vyberte Dokončit.
Po úspěšném dokončení registrace místního prostředí Integration Runtime se zobrazí následující zpráva:
Na stránce Registrace prostředí Integration Runtime (v místním prostředí) vyberte Spustit Správce konfigurace.
Jakmile se uzel připojí ke cloudové službě, zobrazí se následující stránka:
Teď otestujte připojení k databázi SQL Serveru.
a. Na stránce Správce konfigurace přejděte na kartu Diagnostika.
b. Jako typ zdroje dat vyberte SqlServer.
c. Zadejte název serveru.
d. Zadejte název databáze.
e. Vyberte režim ověřování.
f. Zadejte uživatelské jméno.
g. Zadejte heslo přidružené k uživatelskému jménu.
h. Pokud chcete potvrdit, že se prostředí Integration Runtime může připojit k SQL Serveru, vyberte Test. Pokud je připojení úspěšné, zobrazí se zelená značka zaškrtnutí. Jestliže připojení není úspěšné, zobrazí se chybová zpráva. Opravte všechny problémy a ověřte, že se prostředí Integration Runtime může připojit k SQL Serveru.
Poznámka:
Poznamenejte si hodnoty pro typ ověřování, server, databázi, uživatele a heslo. Použijete je později v tomto kurzu.
Vytvoření propojených služeb
V datové továrně vytvoříte propojené služby, abyste svá úložiště dat a výpočetní služby spojili s datovou továrnou. V této části vytvoříte propojené služby s databází SQL Serveru a databází ve službě Azure SQL Database.
Vytvoření propojené služby SQL Serveru
V tomto kroku propočítáte databázi SQL Serveru s datovnou továrnou.
Vytvořte soubor JSON s názvem SqlServerLinkedService.json ve složce C:\ADFTutorials\IncCopyMultiTableTutorial (vytvořte místní složky, pokud ještě neexistují) s následujícím obsahem. Vyberte správnou část na základě ověřování, které požíváte pro připojení k SQL Serveru.
Důležité
Vyberte správnou část na základě ověřování, které požíváte pro připojení k SQL Serveru.
Pokud používáte ověřování SQL, zkopírujte následující definici JSON:
{ "name":"SqlServerLinkedService", "properties":{ "annotations":[ ], "type":"SqlServer", "typeProperties":{ "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>" }, "connectVia":{ "referenceName":"<integration runtime name>", "type":"IntegrationRuntimeReference" } } }
Pokud používáte ověřování Windows, zkopírujte následující definici JSON:
{ "name":"SqlServerLinkedService", "properties":{ "annotations":[ ], "type":"SqlServer", "typeProperties":{ "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>", "userName":"<username> or <domain>\\<username>", "password":{ "type":"SecureString", "value":"<password>" } }, "connectVia":{ "referenceName":"<integration runtime name>", "type":"IntegrationRuntimeReference" } } }
Důležité
- Vyberte správnou část na základě ověřování, které požíváte pro připojení k SQL Serveru.
- Nahraďte <název> prostředí Integration Runtime názvem vašeho prostředí Integration Runtime.
- Před uložením souboru nahraďte <název> serveru, <název databáze>, <uživatelské jméno> a <heslo> hodnotami databáze SQL Serveru.
- Pokud v názvu uživatelského účtu nebo serveru potřebujete použít znak lomítko (
\
), použijte řídicí znak (\
). Příklad:mydomain\\myuser
.
V PowerShellu spusťte následující rutinu, která přepne do složky C:\ADFTutorials\IncCopyMultiTableTutorial.
Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
Spuštěním rutiny Set-AzDataFactoryV2LinkedService vytvořte propojenou službu AzureStorageLinkedService. V následujícím příkladu předáte hodnoty pro parametry ResourceGroupName a DataFactoryName:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
Tady je ukázkový výstup:
LinkedServiceName : SqlServerLinkedService ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
Vytvoření propojené služby SQL Database
Ve složce C:\ADFTutorials\IncCopyMultiTableTutorial vytvořte soubor JSON s názvem AzureSQLDatabaseLinkedService.json s následujícím obsahem. (Pokud složka ADF ještě neexistuje, vytvořte ji.) Před uložením souboru nahraďte <název> serveru, <název> databáze, <uživatelské jméno> a <heslo> názvem databáze SQL Serveru, názvem databáze, uživatelským jménem a heslem.
{ "name":"AzureSQLDatabaseLinkedService", "properties":{ "annotations":[ ], "type":"AzureSqlDatabase", "typeProperties":{ "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;" } } }
V PowerShellu spusťte rutinu Set-AzDataFactoryV2LinkedService a vytvořte propojenou službu AzureSQLDatabaseLinkedService.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
Tady je ukázkový výstup:
LinkedServiceName : AzureSQLDatabaseLinkedService ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
Vytvoření datových sad
V tomto kroku vytvoříte datové sady, které představují zdroj dat, cíl dat a místo pro uložení hodnoty meze.
Vytvoření zdrojové datové sady
Ve stejné složce vytvořte soubor JSON s názvem SourceDataset.json a s následujícím obsahem:
{ "name":"SourceDataset", "properties":{ "linkedServiceName":{ "referenceName":"SqlServerLinkedService", "type":"LinkedServiceReference" }, "annotations":[ ], "type":"SqlServerTable", "schema":[ ] } }
Aktivita kopírování v kanálu používá místo načtení celé tabulky dotaz SQL pro načtení dat.
Spuštěním rutiny Set-AzDataFactoryV2Dataset vytvořte datovou sadu SourceDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
Tady je ukázkový výstup této rutiny:
DatasetName : SourceDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
Vytvoření datové sady jímky
Ve stejné složce vytvořte soubor JSON s názvem SinkDataset.json s následujícím obsahem. Element tableName je nastaven kanálem dynamicky za běhu. Aktivita ForEach v kanálu prochází seznam názvů tabulek a při každé iteraci předává název tabulky této datové sadě.
{ "name":"SinkDataset", "properties":{ "linkedServiceName":{ "referenceName":"AzureSQLDatabaseLinkedService", "type":"LinkedServiceReference" }, "parameters":{ "SinkTableName":{ "type":"String" } }, "annotations":[ ], "type":"AzureSqlTable", "typeProperties":{ "tableName":{ "value":"@dataset().SinkTableName", "type":"Expression" } } } }
Spuštěním rutiny Set-AzDataFactoryV2Dataset vytvořte datovou sadu SinkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
Tady je ukázkový výstup této rutiny:
DatasetName : SinkDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Vytvoření datové sady pro mez
V tomto kroku vytvoříte datovou sadu pro uložení hodnoty horní meze.
Ve stejné složce vytvořte soubor JSON s názvem WatermarkDataset.json s následujícím obsahem:
{ "name": " WatermarkDataset ", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "watermarktable" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
Spuštěním rutiny Set-AzDataFactoryV2Dataset vytvořte datovou sadu WatermarkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
Tady je ukázkový výstup této rutiny:
DatasetName : WatermarkDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Vytvořit kanál
Tento kanál dostává jako parametr seznam tabulek. Aktivita ForEach prochází seznam názvů tabulek a provádí následující operace:
Pomocí aktivity Vyhledávání můžete načíst starou hodnotu meze (počáteční hodnotu nebo hodnotu použitou v poslední iteraci).
Pomocí aktivity Vyhledávání můžete načíst novou hodnotu meze (maximální hodnota sloupce vodoznaku ve zdrojové tabulce).
Pomocí aktivita Copy zkopírujte data mezi těmito dvěma hodnotami meze ze zdrojové databáze do cílové databáze.
Pomocí aktivity StoredProcedure aktualizujte starou hodnotu meze, která se má použít v prvním kroku další iterace.
Vytvoření kanálu
Ve stejné složce vytvořte soubor JSON s názvem IncrementalCopyPipeline.json s následujícím obsahem:
{ "name":"IncrementalCopyPipeline", "properties":{ "activities":[ { "name":"IterateSQLTables", "type":"ForEach", "dependsOn":[ ], "userProperties":[ ], "typeProperties":{ "items":{ "value":"@pipeline().parameters.tableList", "type":"Expression" }, "isSequential":false, "activities":[ { "name":"LookupOldWaterMarkActivity", "type":"Lookup", "dependsOn":[ ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"AzureSqlSource", "sqlReaderQuery":{ "value":"select * from watermarktable where TableName = '@{item().TABLE_NAME}'", "type":"Expression" } }, "dataset":{ "referenceName":"WatermarkDataset", "type":"DatasetReference" } } }, { "name":"LookupNewWaterMarkActivity", "type":"Lookup", "dependsOn":[ ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"SqlServerSource", "sqlReaderQuery":{ "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}", "type":"Expression" } }, "dataset":{ "referenceName":"SourceDataset", "type":"DatasetReference" }, "firstRowOnly":true } }, { "name":"IncrementalCopyActivity", "type":"Copy", "dependsOn":[ { "activity":"LookupOldWaterMarkActivity", "dependencyConditions":[ "Succeeded" ] }, { "activity":"LookupNewWaterMarkActivity", "dependencyConditions":[ "Succeeded" ] } ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"SqlServerSource", "sqlReaderQuery":{ "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'", "type":"Expression" } }, "sink":{ "type":"AzureSqlSink", "sqlWriterStoredProcedureName":{ "value":"@{item().StoredProcedureNameForMergeOperation}", "type":"Expression" }, "sqlWriterTableType":{ "value":"@{item().TableType}", "type":"Expression" }, "storedProcedureTableTypeParameterName":{ "value":"@{item().TABLE_NAME}", "type":"Expression" }, "disableMetricsCollection":false }, "enableStaging":false }, "inputs":[ { "referenceName":"SourceDataset", "type":"DatasetReference" } ], "outputs":[ { "referenceName":"SinkDataset", "type":"DatasetReference", "parameters":{ "SinkTableName":{ "value":"@{item().TABLE_NAME}", "type":"Expression" } } } ] }, { "name":"StoredProceduretoWriteWatermarkActivity", "type":"SqlServerStoredProcedure", "dependsOn":[ { "activity":"IncrementalCopyActivity", "dependencyConditions":[ "Succeeded" ] } ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "storedProcedureName":"[dbo].[usp_write_watermark]", "storedProcedureParameters":{ "LastModifiedtime":{ "value":{ "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type":"Expression" }, "type":"DateTime" }, "TableName":{ "value":{ "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"Expression" }, "type":"String" } } }, "linkedServiceName":{ "referenceName":"AzureSQLDatabaseLinkedService", "type":"LinkedServiceReference" } } ] } } ], "parameters":{ "tableList":{ "type":"array" } }, "annotations":[ ] } }
Spuštěním rutiny Set-AzDataFactoryV2Pipeline vytvořte kanál IncrementalCopyPipeline.
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
Tady je ukázkový výstup:
PipelineName : IncrementalCopyPipeline ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Activities : {IterateSQLTables} Parameters : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
Spuštění kanálu
Ve stejné složce vytvořte soubor parametrů s názvem Parameters.json s následujícím obsahem:
{ "tableList": [ { "TABLE_NAME": "customer_table", "WaterMark_Column": "LastModifytime", "TableType": "DataTypeforCustomerTable", "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table" }, { "TABLE_NAME": "project_table", "WaterMark_Column": "Creationtime", "TableType": "DataTypeforProjectTable", "StoredProcedureNameForMergeOperation": "usp_upsert_project_table" } ] }
Spusťte kanál IncrementalCopyPipeline pomocí rutiny Invoke-AzDataFactoryV2Pipeline . Zástupné znaky nahraďte vlastním názvem skupiny prostředků a názvem datové továrny.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
Monitorování kanálu
Přihlaste se k portálu Azure.
Vyberte Všechny služby, spusťte hledání pomocí klíčového slova Datové továrny a vyberte Datové továrny.
V seznamu datových továren vyhledejte vaši datovou továrnu a vyberte ji. Otevře se stránka Datová továrna.
Na stránce Datová továrna vyberte Otevřít na dlaždici Otevřít Azure Data Factory Studio a spusťte Azure Data Factory na samostatné kartě.
Na domovské stránce služby Azure Data Factory vyberte Na levé straně možnost Monitorování .
Zobrazí se všechna spuštění kanálů a jejich stavy. Všimněte si, že stav spuštění kanálu v následujícím příkladu je Úspěšně. Parametry předané kanálu můžete zkontrolovat kliknutím na odkaz ve sloupci Parametry. Pokud došlo k chybě, uvidíte odkaz ve sloupci Chyba.
Když ve sloupci Akce vyberete odkaz, zobrazí se všechna spuštění aktivit kanálu.
Pokud se chcete vrátit do zobrazení Spuštění kanálu, vyberte Všechna spuštění kanálu.
Kontrola výsledků
V SQL Server Management Studiu spusťte následující dotazy na cílovou databázi SQL a ověřte, že data byla ze zdrojových tabulek zkopírována do cílových tabulek:
Dotaz
select * from customer_table
Výstup
===========================================
PersonID Name LastModifytime
===========================================
1 John 2017-09-01 00:56:00.000
2 Mike 2017-09-02 05:23:00.000
3 Alice 2017-09-03 02:36:00.000
4 Andy 2017-09-04 03:21:00.000
5 Anny 2017-09-05 08:06:00.000
Dotaz
select * from project_table
Výstup
===================================
Project Creationtime
===================================
project1 2015-01-01 00:00:00.000
project2 2016-02-02 01:23:00.000
project3 2017-03-04 05:16:00.000
Dotaz
select * from watermarktable
Výstup
======================================
TableName WatermarkValue
======================================
customer_table 2017-09-05 08:06:00.000
project_table 2017-03-04 05:16:00.000
Všimněte si, že hodnoty mezí pro obě tabulky byly aktualizovány.
Přidání dalších dat do zdrojových tabulek
Spusťte následující dotaz na zdrojovou databázi SQL Serveru, aby se aktualizoval stávající řádek v tabulce customer_table. Vložte nový řádek do tabulky project_table.
UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3
INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');
Opětovné spuštění kanálu
Nyní spusťte znovu kanálu provedením následujícího příkazu Powershellu:
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
Monitorujte spuštění kanálu podle pokynů v části Monitorování kanálu. Pokud je stav kanálu probíhající, zobrazí se v části Akce další odkaz na akci, která zruší spuštění kanálu.
Kliknutím na Aktualizovat můžete aktualizovat seznam, dokud nebude spuštění kanálu úspěšné.
Volitelně můžete v části Akce vybrat odkaz Zobrazit spuštění aktivit a zobrazit si všechna spuštění aktivit související s tímto spuštěním kanálu.
Kontrola konečných výsledků
V SQL Server Management Studiu spusťte následující dotazy na cílovou databázi a ověřte, že aktualizovaná/nová data byla ze zdrojových tabulek zkopírována do cílových tabulek.
Dotaz
select * from customer_table
Výstup
===========================================
PersonID Name LastModifytime
===========================================
1 John 2017-09-01 00:56:00.000
2 Mike 2017-09-02 05:23:00.000
3 NewName 2017-09-08 00:00:00.000
4 Andy 2017-09-04 03:21:00.000
5 Anny 2017-09-05 08:06:00.000
Všimněte si nových hodnot položek Name a LastModifytime pro PersonID pro číslo 3.
Dotaz
select * from project_table
Výstup
===================================
Project Creationtime
===================================
project1 2015-01-01 00:00:00.000
project2 2016-02-02 01:23:00.000
project3 2017-03-04 05:16:00.000
NewProject 2017-10-01 00:00:00.000
Všimněte si, že do tabulky project_table byla přidána položka NewProject.
Dotaz
select * from watermarktable
Výstup
======================================
TableName WatermarkValue
======================================
customer_table 2017-09-08 00:00:00.000
project_table 2017-10-01 00:00:00.000
Všimněte si, že hodnoty mezí pro obě tabulky byly aktualizovány.
Související obsah
V tomto kurzu jste provedli následující kroky:
- Příprava zdrojového a cílového datového úložiště
- Vytvoření datové továrny
- Vytvoření místního prostředí Integration Runtime (IR)
- Instalace prostředí Integration Runtime
- Vytvoření propojených služeb
- Vytvoření zdroje, jímky a datových sad mezí
- Vytvoření a spuštění kanálu a jeho monitorování
- Zkontrolujte výsledky.
- Přidání nebo aktualizace dat ve zdrojových tabulkách
- Opakované spuštění kanálu a jeho monitorování
- Kontrola konečných výsledků
Pokud se chcete dozvědět víc o transformaci dat pomocí clusteru Spark v Azure, přejděte k následujícímu kurzu: