Caricare dati in modo incrementale da più tabelle in SQL Server al database SQL di Azure con PowerShell
SI APPLICA A: Azure Data Factory Azure Synapse Analytics
Suggerimento
Provare Data Factory in Microsoft Fabric, una soluzione di analisi all-in-one per le aziende. Microsoft Fabric copre tutto, dallo spostamento dati al data science, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Vedere le informazioni su come iniziare una nuova prova gratuita!
In questa esercitazione si crea un'istanza di Azure Data Factory con una pipeline che carica dati differenziali da più tabelle di un database di SQL Server in Database SQL di Azure.
In questa esercitazione vengono completati i passaggi seguenti:
- Preparare gli archivi dati di origine e di destinazione.
- Creare una data factory.
- Creare un runtime di integrazione self-hosted.
- Installare il runtime di integrazione.
- Creare servizi collegati.
- Creare i set di dati di origine, sink e limite.
- Creare, eseguire e monitorare una pipeline.
- Esaminare i risultati.
- Aggiungere o aggiornare dati nelle tabelle di origine.
- Rieseguire e monitorare la pipeline.
- Esaminare i risultati finali.
Panoramica
Di seguito sono descritti i passaggi fondamentali per la creazione di questa soluzione:
Selezionare la colonna del limite.
Selezionare una colonna per ogni tabella nell'archivio dati di origine, in cui identificare i record nuovi o aggiornati per ogni esecuzione. I dati della colonna selezionata (ad esempio last_modify_time o ID) continuano in genere ad aumentare quando le righe vengono create o aggiornate. Il valore massimo di questa colonna viene usato come limite.
Preparare un archivio dati per l'archiviazione del valore limite.
In questa esercitazione si archivia il valore limite in un database SQL.
Creare una pipeline con le attività seguenti:
Creare un'attività ForEach che esegue l'iterazione di un elenco di nomi di tabella di origine passato come parametro alla pipeline. Per ogni tabella di origine, l'attività richiama le attività seguenti per eseguire il caricamento differenziale per la tabella.
Creare due attività di ricerca. Usare la prima attività di ricerca per recuperare l'ultimo valore limite. Usare la seconda attività di ricerca per recuperare il nuovo valore limite. Questi valori limite vengono passati all'attività di copia.
Creare un attività Copy che copia le righe dall'archivio dati di origine con il valore della colonna filigrana maggiore del valore limite precedente e minore o uguale al nuovo valore limite. L'attività copierà quindi i dati delta dall'archivio dati di origine a un archivio BLOB di Azure come nuovo file.
Creare un'attività stored procedure che aggiorni il valore limite per la pipeline che verrà eseguita la volta successiva.
Il diagramma generale della soluzione è il seguente:
Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.
Prerequisiti
- SQL Server. In questa esercitazione si usa un database di SQL Server come archivio dati di origine.
- Database SQL di Azure. Usare un database del database SQL di Azure come archivio dati sink. Se non è disponibile un database SQL, vedere Creare un database del database SQL di Azure per crearne uno.
Creare le tabelle di origine nel database di SQL Server
Aprire SQL Server Management Studio (SSMS) o Azure Data Studio e connettersi al database di SQL Server.
In Esplora server (SSMS) o nel riquadro Connessioni (Azure Data Studio) fare clic con il pulsante destro del mouse sul database e scegliere Nuova query.
Eseguire il comando SQL seguente sul database per creare le tabelle denominate
customer_table
eproject_table
:create table customer_table ( PersonID int, Name varchar(255), LastModifytime datetime ); create table project_table ( Project varchar(255), Creationtime datetime ); INSERT INTO customer_table (PersonID, Name, LastModifytime) VALUES (1, 'John','9/1/2017 12:56:00 AM'), (2, 'Mike','9/2/2017 5:23:00 AM'), (3, 'Alice','9/3/2017 2:36:00 AM'), (4, 'Andy','9/4/2017 3:21:00 AM'), (5, 'Anny','9/5/2017 8:06:00 AM'); INSERT INTO project_table (Project, Creationtime) VALUES ('project1','1/1/2015 0:00:00 AM'), ('project2','2/2/2016 1:23:00 AM'), ('project3','3/4/2017 5:16:00 AM');
Creare le tabelle di destinazione nel database SQL di Azure
Aprire SQL Server Management Studio (SSMS) o Azure Data Studio e connettersi al database di SQL Server.
In Esplora server (SSMS) o nel riquadro Connessioni (Azure Data Studio) fare clic con il pulsante destro del mouse sul database e scegliere Nuova query.
Eseguire il comando SQL seguente sul database per creare le tabelle denominate
customer_table
eproject_table
:create table customer_table ( PersonID int, Name varchar(255), LastModifytime datetime ); create table project_table ( Project varchar(255), Creationtime datetime );
Creare un'altra tabella nel database SQL di Azure per archiviare il valore del limite massimo
Eseguire il comando SQL seguente nel database SQL per creare una tabella denominata
watermarktable
in cui archiviare il valore limite:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );
Inserire i valori del limite iniziale per entrambe le tabelle di origine nella tabella dei limiti.
INSERT INTO watermarktable VALUES ('customer_table','1/1/2010 12:00:00 AM'), ('project_table','1/1/2010 12:00:00 AM');
Creare una stored procedure nel database SQL di Azure
Eseguire il comando seguente per creare una stored procedure nel database. Questa stored procedure aggiorna il valore del limite dopo ogni esecuzione di pipeline.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Creare tipi di dati e stored procedure aggiuntive nel database SQL di Azure
Eseguire la query seguente per creare due stored procedure e due tipi di dati nel database. Questi elementi vengono usati per unire i dati delle tabelle di origine nelle tabelle di destinazione.
Per semplificare la procedura, si usano direttamente queste stored procedure passando i dati differenziali tramite una variabile di tabella e quindi unendoli nell'archivio di destinazione. Si noti che non è prevista l'archiviazione di un numero di righe differenziali elevato (più di 100) nella variabile di tabella.
Se è necessario unire un numero elevato di righe delta nell'archivio di destinazione, è consigliabile usare un'attività di copia per copiare prima tutti i dati differenziali in una tabella temporanea nell'archivio di destinazione e quindi compilare la propria stored procedure senza usare la variabile di tabella per eseguire il merge dalla tabella temporanea alla tabella finale.
CREATE TYPE DataTypeforCustomerTable AS TABLE(
PersonID int,
Name varchar(255),
LastModifytime datetime
);
GO
CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS
BEGIN
MERGE customer_table AS target
USING @customer_table AS source
ON (target.PersonID = source.PersonID)
WHEN MATCHED THEN
UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
WHEN NOT MATCHED THEN
INSERT (PersonID, Name, LastModifytime)
VALUES (source.PersonID, source.Name, source.LastModifytime);
END
GO
CREATE TYPE DataTypeforProjectTable AS TABLE(
Project varchar(255),
Creationtime datetime
);
GO
CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS
BEGIN
MERGE project_table AS target
USING @project_table AS source
ON (target.Project = source.Project)
WHEN MATCHED THEN
UPDATE SET Creationtime = source.Creationtime
WHEN NOT MATCHED THEN
INSERT (Project, Creationtime)
VALUES (source.Project, source.Creationtime);
END
Azure PowerShell
Installare i moduli di Azure PowerShell più recenti seguendo le istruzioni descritte in Installare e configurare Azure PowerShell.
Creare una data factory
Definire una variabile per il nome del gruppo di risorse usato in seguito nei comandi di PowerShell. Copiare il testo del comando seguente in PowerShell, specificare un nome per il gruppo di risorse di Azure tra virgolette doppie e quindi eseguire il comando. Un esempio è
"adfrg"
.$resourceGroupName = "ADFTutorialResourceGroup";
Se il gruppo di risorse esiste già, potrebbe essere preferibile non sovrascriverlo. Assegnare un valore diverso alla variabile
$resourceGroupName
ed eseguire di nuovo il comando.Definire una variabile per la località della data factory.
$location = "East US"
Per creare il gruppo di risorse di Azure, eseguire questo comando:
New-AzResourceGroup $resourceGroupName $location
Se il gruppo di risorse esiste già, potrebbe essere preferibile non sovrascriverlo. Assegnare un valore diverso alla variabile
$resourceGroupName
ed eseguire di nuovo il comando.Definire una variabile per il nome della data factory.
Importante
Aggiornare il nome della data factory in modo che sia univoco a livello globale, Un esempio è ADFIncMultiCopyTutorialFactorySP1127.
$dataFactoryName = "ADFIncMultiCopyTutorialFactory";
Per creare la data factory, eseguire questo cmdlet Set-AzDataFactoryV2:
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
Notare i punti seguenti:
Il nome della data factory deve essere univoco a livello globale. Se viene visualizzato l'errore seguente, modificare il nome e riprovare:
Set-AzDataFactoryV2 : HTTP Status Code: Conflict Error Code: DataFactoryNameInUse Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
Per creare istanze di Data Factory, l'account utente usato per accedere ad Azure deve essere un membro dei ruoli collaboratore o proprietario oppure un amministratore della sottoscrizione di Azure.
Per un elenco di aree di Azure in cui Data Factory è attualmente disponibile, selezionare le aree di interesse nella pagina seguente, quindi espandere Analytics per individuare Data Factory: Prodotti disponibili in base all'area. Gli archivi dati (Archiviazione di Azure, database SQL, Istanza gestita di SQL e così via) e le risorse di calcolo (Azure HDInsight e così via) usati dalla data factory possono trovarsi in altre aree.
Creare un runtime di integrazione self-hosted
In questa sezione si crea un runtime di integrazione self-hosted e lo si associa a un computer locale con il database di SQL Server. Il runtime di integrazione self-hosted è il componente che copia i dati dall'istanza di SQL Server nel computer al database SQL di Azure.
Creare una variabile per il nome del runtime di integrazione. Usare un nome univoco e prenderne nota. perché sarà usato più avanti in questa esercitazione.
$integrationRuntimeName = "ADFTutorialIR"
Creare un runtime di integrazione self-hosted.
Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
Di seguito è riportato l'output di esempio:
Name : <Integration Runtime name> Type : SelfHosted ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Description : Id : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
Eseguire questo comando per recuperare lo stato del runtime di integrazione creato. Verificare che il valore della proprietà State sia impostato su NeedRegistration.
Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
Di seguito è riportato l'output di esempio:
State : NeedRegistration Version : CreateTime : 9/24/2019 6:00:00 AM AutoUpdate : On ScheduledUpdateDate : UpdateDelayOffset : LocalTimeZoneOffset : InternalChannelEncryption : Capabilities : {} ServiceUrls : {eu.frontend.clouddatahub.net} Nodes : {} Links : {} Name : ADFTutorialIR Type : SelfHosted ResourceGroupName : <ResourceGroup name> DataFactoryName : <DataFactory name> Description : Id : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
Eseguire questo comando per recuperare le chiavi di autenticazione per la registrazione del runtime di integrazione self-hosted nel servizio Azure Data Factory nel cloud:
Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
Di seguito è riportato l'output di esempio:
{ "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=", "AuthKey2": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy=" }
Copiare una delle chiavi (escluse le virgolette) usate per la registrazione del runtime di integrazione self-hosted che si installerà nel computer nei passaggi successivi.
Installare lo strumento del runtime di integrazione
Se il runtime di integrazione è già installato nel computer, disinstallarlo tramite Installazione applicazioni.
Scaricare il runtime di integrazione self-hosted in un computer Windows locale. Eseguire l'installazione.
Nella pagina dell'installazione guidata di Microsoft Integration Runtime fare clic su Avanti.
Nella pagina Contratto di licenza con l'utente finale accettare le condizioni e fare clic su Avanti.
Nella pagina Cartella di destinazione fare clic su Avanti.
Nella pagina Pronto per l'installazione fare clic su Installa.
Nella pagina Installazione di Microsoft Integration Runtime completata fare clic su Fine.
Nella pagina Registra Runtime di integrazione (self-hosted) incollare la chiave salvata nella sezione precedente e fare clic su Registra.
Nella pagina Nuovo nodo di Integration Runtime (self-hosted) fare clic su Fine.
Al termine della registrazione del runtime di integrazione self-hosted viene visualizzato il messaggio seguente:
Nella pagina Registra Runtime di integrazione (self-hosted) fare clic su Avvia Configuration Manager.
Quando il nodo viene connesso al servizio cloud, viene visualizzata la pagina seguente:
Testare ora la connettività al database di SQL Server.
a. Nella pagina Configuration Manager passare alla scheda Diagnostica.
b. Selezionare SqlServer per il tipo di origine dati.
c. Immettere il nome del server.
d. Immettere il nome del database.
e. Selezionare la modalità di autenticazione.
f. Immettere il nome utente.
g. Immettere la password associata al nome utente.
h. Selezionare Test per verificare che il runtime di integrazione possa connettersi a SQL Server. Se la connessione ha esito positivo, viene visualizzato un segno di spunta verde. Se la connessione non ha esito positivo, viene visualizzato un messaggio di errore. Risolvere eventuali problemi e assicurarsi che il runtime di integrazione possa connettesi a SQL Server.
Nota
Prendere nota di questi valori per tipo di autenticazione, server, database, utente e password, perché saranno usati più avanti in questa esercitazione.
Creare servizi collegati
Si creano servizi collegati in una data factory per collegare gli archivi dati e i servizi di calcolo alla data factory. In questa sezione vengono creati i servizi collegati al database di SQL Server e al database nel database SQL di Azure.
Creare il servizio collegato di SQL Server
In questo passaggio si collega il database di SQL Server alla data factory.
Nella cartella C:\ADFTutorials\IncCopyMultiTableTutorial (creare le cartelle locali se non sono già esistenti) creare un file JSON denominato SqlServerLinkedService.json con il contenuto seguente. Selezionare la sezione corretta in base all'autenticazione usata per connettersi a SQL Server.
Importante
Selezionare la sezione corretta in base all'autenticazione usata per connettersi a SQL Server.
Se si usa l'autenticazione SQL, copiare la definizione JSON seguente:
{ "name":"SqlServerLinkedService", "properties":{ "annotations":[ ], "type":"SqlServer", "typeProperties":{ "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>" }, "connectVia":{ "referenceName":"<integration runtime name>", "type":"IntegrationRuntimeReference" } } }
Se si usa l'autenticazione Windows, copiare la definizione JSON seguente:
{ "name":"SqlServerLinkedService", "properties":{ "annotations":[ ], "type":"SqlServer", "typeProperties":{ "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>", "userName":"<username> or <domain>\\<username>", "password":{ "type":"SecureString", "value":"<password>" } }, "connectVia":{ "referenceName":"<integration runtime name>", "type":"IntegrationRuntimeReference" } } }
Importante
- Selezionare la sezione corretta in base all'autenticazione usata per connettersi a SQL Server.
- Sostituire <il nome> del runtime di integrazione con il nome del runtime di integrazione.
- Sostituire <servername>, <databasename>, <username> e <password> con i valori del database di SQL Server prima di salvare il file.
- Se è necessario usare una barra (
\
) nell'account utente o nel nome del server, usare il carattere di escape (\
). Un esempio èmydomain\\myuser
.
In PowerShell eseguire il cmdlet seguente per passare alla cartella C:\ADFTutorials\IncCopyMultiTableTutorial.
Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
Eseguire il cmdlet Set-AzDataFactoryV2LinkedService per creare il servizio collegato AzureStorageLinkedService. Nell'esempio seguente si passano i valori per i parametri ResourceGroupName e DataFactoryName:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
Di seguito è riportato l'output di esempio:
LinkedServiceName : SqlServerLinkedService ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
Creare il servizio collegato per il database SQL
Nella cartella C:\ADFTutorials\IncCopyMultiTableTutorial creare un file JSON denominato AzureSQLDatabaseLinkedService.json con il contenuto seguente. Se non esiste già, creare la cartella ADF. Sostituire <nome server>, <nome> database, <nome> utente e <password> con il nome del database DI SQL Server, il nome del database, il nome utente e la password prima di salvare il file.
{ "name":"AzureSQLDatabaseLinkedService", "properties":{ "annotations":[ ], "type":"AzureSqlDatabase", "typeProperties":{ "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;" } } }
In PowerShell eseguire il cmdlet Set-AzDataFactoryV2LinkedService per creare il servizio collegato AzureSQLDatabaseLinkedService.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
Di seguito è riportato l'output di esempio:
LinkedServiceName : AzureSQLDatabaseLinkedService ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
Creare i set di dati
In questo passaggio vengono creati i set di dati per rappresentare l'origine dati, la destinazione dati e la posizione di archiviazione del valore limite.
Creare set di dati di origine
Creare un file JSON denominato SourceDataset.json nella stessa cartella con il contenuto seguente:
{ "name":"SourceDataset", "properties":{ "linkedServiceName":{ "referenceName":"SqlServerLinkedService", "type":"LinkedServiceReference" }, "annotations":[ ], "type":"SqlServerTable", "schema":[ ] } }
L'attività di copia nella pipeline usa una query SQL per caricare i dati invece di caricare l'intera tabella.
Eseguire il cmdlet Set-AzDataFactoryV2Dataset per creare il set di dati SourceDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
Ecco l'output di esempio del cmdlet:
DatasetName : SourceDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
Creare un set di dati sink
Nella stessa cartella creare un file JSON denominato SinkDataset.json con il contenuto seguente. L'elemento tableName viene impostato dalla pipeline in modo dinamico in fase di esecuzione. L'attività ForEach nella pipeline esegue l'iterazione di un elenco di nomi di tabella e passa il nome di tabella a questo set di dati in ogni interazione.
{ "name":"SinkDataset", "properties":{ "linkedServiceName":{ "referenceName":"AzureSQLDatabaseLinkedService", "type":"LinkedServiceReference" }, "parameters":{ "SinkTableName":{ "type":"String" } }, "annotations":[ ], "type":"AzureSqlTable", "typeProperties":{ "tableName":{ "value":"@dataset().SinkTableName", "type":"Expression" } } } }
Eseguire il cmdlet Set-AzDataFactoryV2Dataset per creare il set di dati SinkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
Ecco l'output di esempio del cmdlet:
DatasetName : SinkDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Creare un set di dati per un limite
In questo passaggio si crea un set di dati per l'archiviazione di un valore limite massimo.
Nella stessa cartella creare un file JSON denominato WatermarkDataset.json con il contenuto seguente:
{ "name": " WatermarkDataset ", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "watermarktable" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
Eseguire il cmdlet Set-AzDataFactoryV2Dataset per creare il set di dati WatermarkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
Ecco l'output di esempio del cmdlet:
DatasetName : WatermarkDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Creare una pipeline
Questa pipeline accetta un elenco di nomi di tabella come parametro. L'attività ForEach esegue l'iterazione dell'elenco dei nomi di tabella ed esegue le operazioni seguenti:
Usa l'attività di ricerca per recuperare il valore del limite precedente (valore iniziale o usato nell'ultima iterazione).
Usa l'attività di ricerca per recuperare il nuovo valore del limite (valore massimo della colonna dei limiti nella tabella di origine).
Usa l'attività di copia per copiare dati tra i due valori del limite dal database di origine al database di destinazione.
Usa l'attività StoredProcedure per aggiornare il valore del limite precedente da usare nel primo passaggio dell'iterazione successiva.
Creare la pipeline
Nella stessa cartella creare un file JSON denominato IncrementalCopyPipeline.json con il contenuto seguente:
{ "name":"IncrementalCopyPipeline", "properties":{ "activities":[ { "name":"IterateSQLTables", "type":"ForEach", "dependsOn":[ ], "userProperties":[ ], "typeProperties":{ "items":{ "value":"@pipeline().parameters.tableList", "type":"Expression" }, "isSequential":false, "activities":[ { "name":"LookupOldWaterMarkActivity", "type":"Lookup", "dependsOn":[ ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"AzureSqlSource", "sqlReaderQuery":{ "value":"select * from watermarktable where TableName = '@{item().TABLE_NAME}'", "type":"Expression" } }, "dataset":{ "referenceName":"WatermarkDataset", "type":"DatasetReference" } } }, { "name":"LookupNewWaterMarkActivity", "type":"Lookup", "dependsOn":[ ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"SqlServerSource", "sqlReaderQuery":{ "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}", "type":"Expression" } }, "dataset":{ "referenceName":"SourceDataset", "type":"DatasetReference" }, "firstRowOnly":true } }, { "name":"IncrementalCopyActivity", "type":"Copy", "dependsOn":[ { "activity":"LookupOldWaterMarkActivity", "dependencyConditions":[ "Succeeded" ] }, { "activity":"LookupNewWaterMarkActivity", "dependencyConditions":[ "Succeeded" ] } ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"SqlServerSource", "sqlReaderQuery":{ "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'", "type":"Expression" } }, "sink":{ "type":"AzureSqlSink", "sqlWriterStoredProcedureName":{ "value":"@{item().StoredProcedureNameForMergeOperation}", "type":"Expression" }, "sqlWriterTableType":{ "value":"@{item().TableType}", "type":"Expression" }, "storedProcedureTableTypeParameterName":{ "value":"@{item().TABLE_NAME}", "type":"Expression" }, "disableMetricsCollection":false }, "enableStaging":false }, "inputs":[ { "referenceName":"SourceDataset", "type":"DatasetReference" } ], "outputs":[ { "referenceName":"SinkDataset", "type":"DatasetReference", "parameters":{ "SinkTableName":{ "value":"@{item().TABLE_NAME}", "type":"Expression" } } } ] }, { "name":"StoredProceduretoWriteWatermarkActivity", "type":"SqlServerStoredProcedure", "dependsOn":[ { "activity":"IncrementalCopyActivity", "dependencyConditions":[ "Succeeded" ] } ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "storedProcedureName":"[dbo].[usp_write_watermark]", "storedProcedureParameters":{ "LastModifiedtime":{ "value":{ "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type":"Expression" }, "type":"DateTime" }, "TableName":{ "value":{ "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"Expression" }, "type":"String" } } }, "linkedServiceName":{ "referenceName":"AzureSQLDatabaseLinkedService", "type":"LinkedServiceReference" } } ] } } ], "parameters":{ "tableList":{ "type":"array" } }, "annotations":[ ] } }
Eseguire il cmdlet Set-AzDataFactoryV2Pipeline per creare la pipeline IncrementalCopyPipeline.
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
Di seguito è riportato l'output di esempio:
PipelineName : IncrementalCopyPipeline ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Activities : {IterateSQLTables} Parameters : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
Eseguire la pipeline
Nella stessa cartella creare un file di parametri denominato Parameters.json con il contenuto seguente:
{ "tableList": [ { "TABLE_NAME": "customer_table", "WaterMark_Column": "LastModifytime", "TableType": "DataTypeforCustomerTable", "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table" }, { "TABLE_NAME": "project_table", "WaterMark_Column": "Creationtime", "TableType": "DataTypeforProjectTable", "StoredProcedureNameForMergeOperation": "usp_upsert_project_table" } ] }
Eseguire la pipeline IncrementalCopyPipeline usando il cmdlet Invoke-AzDataFactoryV2Pipeline. Sostituire i segnaposto con il nome del gruppo di risorse e della data factory.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
Monitorare la pipeline
Accedere al portale di Azure.
Selezionare Tutti i servizi, eseguire una ricerca con la parola chiave Data factory e selezionare Data factory.
Cercare la propria data factory nell'elenco delle data factory e selezionarla per aprire la pagina Data factory.
Nella pagina Data factory selezionare Apri nel riquadro Apri Azure Data Factory Studio per avviare Azure Data Factory in una scheda separata.
Nella home page di Azure Data Factory selezionare Monitoraggio sul lato sinistro.
È possibile visualizzare tutte le esecuzioni di pipeline e i rispettivi stati. Notare che nell'esempio seguente lo stato dell'esecuzione della pipeline è Riuscito. Per verificare i parametri passati alla pipeline, selezionare il collegamento nella colonna Parametri. Se si è verificato un errore, verrà visualizzato un collegamento nella colonna Errore.
Quando si seleziona il collegamento nella colonna Azioni, vengono visualizzate tutte le esecuzioni di attività per la pipeline.
Per tornare alla visualizzazione Pipeline Runs (Esecuzioni di pipeline), selezionare Tutte le esecuzioni della pipeline.
Esaminare i risultati
In SQL Server Management Studio eseguire queste query sul database SQL di destinazione per verificare che i dati siano stati copiati dalle tabelle di origine alle tabelle di destinazione:
Query
select * from customer_table
Output
===========================================
PersonID Name LastModifytime
===========================================
1 John 2017-09-01 00:56:00.000
2 Mike 2017-09-02 05:23:00.000
3 Alice 2017-09-03 02:36:00.000
4 Andy 2017-09-04 03:21:00.000
5 Anny 2017-09-05 08:06:00.000
Query
select * from project_table
Output
===================================
Project Creationtime
===================================
project1 2015-01-01 00:00:00.000
project2 2016-02-02 01:23:00.000
project3 2017-03-04 05:16:00.000
Query
select * from watermarktable
Output
======================================
TableName WatermarkValue
======================================
customer_table 2017-09-05 08:06:00.000
project_table 2017-03-04 05:16:00.000
Notare che i valori del limite per entrambe le tabelle sono stati aggiornati.
Aggiungere altri dati alle tabelle di origine
Eseguire questa query sul database di SQL Server di origine per aggiornare una riga esistente in customer_table. Inserire una nuova riga in project_table.
UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3
INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');
Eseguire di nuovo la pipeline
Eseguire di nuovo la pipeline con il comando di PowerShell seguente:
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
Monitorare le esecuzioni di pipeline seguendo le istruzioni contenute nella sezione Monitorare la pipeline. Quando lo stato della pipeline è In corso, viene visualizzato un altro collegamento a un'azione in Azioni per annullare l'esecuzione della pipeline.
Selezionare Aggiorna per aggiornare l'elenco finché l'esecuzione della pipeline non riesce.
Fare eventualmente clic sul collegamento Visualizza esecuzioni attività in Azioni per visualizzare tutte le esecuzioni di attività associate a questa esecuzione di pipeline.
Esaminare i risultati finali
In SQL Server Management Studio eseguire le query seguenti sul database di destinazione per verificare che i dati nuovi/aggiornati siano stati copiati dalle tabelle di origine alle tabelle di destinazione.
Query
select * from customer_table
Output
===========================================
PersonID Name LastModifytime
===========================================
1 John 2017-09-01 00:56:00.000
2 Mike 2017-09-02 05:23:00.000
3 NewName 2017-09-08 00:00:00.000
4 Andy 2017-09-04 03:21:00.000
5 Anny 2017-09-05 08:06:00.000
Notare i nuovi valori di Name e LastModifytime per PersonID per il numero 3.
Query
select * from project_table
Output
===================================
Project Creationtime
===================================
project1 2015-01-01 00:00:00.000
project2 2016-02-02 01:23:00.000
project3 2017-03-04 05:16:00.000
NewProject 2017-10-01 00:00:00.000
Si noti che è stata aggiunta la voce NewProject a project_table.
Query
select * from watermarktable
Output
======================================
TableName WatermarkValue
======================================
customer_table 2017-09-08 00:00:00.000
project_table 2017-10-01 00:00:00.000
Notare che i valori del limite per entrambe le tabelle sono stati aggiornati.
Contenuto correlato
In questa esercitazione sono stati eseguiti i passaggi seguenti:
- Preparare gli archivi dati di origine e di destinazione.
- Creare una data factory.
- Creare un runtime di integrazione self-hosted.
- Installare il runtime di integrazione.
- Creare servizi collegati.
- Creare i set di dati di origine, sink e limite.
- Creare, eseguire e monitorare una pipeline.
- Esaminare i risultati.
- Aggiungere o aggiornare dati nelle tabelle di origine.
- Rieseguire e monitorare la pipeline.
- Esaminare i risultati finali.
Passare all'esercitazione successiva per informazioni sulla trasformazione dei dati usando un cluster Spark in Azure: