Caricare i dati in modo incrementale dal database SQL di Azure ad Archiviazione BLOB di Azure con PowerShell
SI APPLICA A: Azure Data Factory Azure Synapse Analytics
Suggerimento
Provare Data Factory in Microsoft Fabric, una soluzione di analisi all-in-one per le aziende. Microsoft Fabric copre tutto, dallo spostamento dati al data science, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Vedere le informazioni su come iniziare una nuova prova gratuita!
In questa esercitazione si usa Azure Data Factory per creare una pipeline che carica dati differenziali da una tabella in database SQL di Azure all'archivio BLOB di Azure.
In questa esercitazione vengono completati i passaggi seguenti:
- Preparare l'archivio dati per l'archiviazione del valore limite.
- Creare una data factory.
- Creare servizi collegati.
- Creare i set di dati di origine, sink e limite.
- Creare una pipeline.
- Esegui la pipeline.
- Monitorare l'esecuzione della pipeline.
Panoramica
Il diagramma generale della soluzione è il seguente:
Di seguito sono descritti i passaggi fondamentali per la creazione di questa soluzione:
Selezionare la colonna del limite. Selezionare una colonna nell'archivio dati di origine, che può essere usata per analizzare approfonditamente i record nuovi o aggiornati per ogni esecuzione. I dati della colonna selezionata (ad esempio last_modify_time o ID) continuano in genere ad aumentare quando le righe vengono create o aggiornate. Il valore massimo di questa colonna viene usato come limite.
Preparare un archivio dati per l'archiviazione del valore limite.
In questa esercitazione si archivia il valore limite in un database SQL.Creare una pipeline con il flusso di lavoro seguente:
La pipeline di questa soluzione contiene le attività seguenti:
- Creare due attività di ricerca. Usare la prima attività di ricerca per recuperare l'ultimo valore limite. Usare la seconda attività di ricerca per recuperare il nuovo valore limite. Questi valori limite vengono passati all'attività di copia.
- Creare un attività Copy che copia le righe dall'archivio dati di origine con il valore della colonna filigrana maggiore del valore limite precedente e minore o uguale al nuovo valore limite. L'attività copierà quindi i dati delta dall'archivio dati di origine a un archivio BLOB come nuovo file.
- Creare un'attività stored procedure che aggiorni il valore limite per la pipeline che verrà eseguita la volta successiva.
Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.
Prerequisiti
Nota
È consigliabile usare il modulo Azure Az PowerShell per interagire con Azure. Per iniziare, vedere Installare Azure PowerShell. Per informazioni su come eseguire la migrazione al modulo AZ PowerShell, vedere Eseguire la migrazione di Azure PowerShell da AzureRM ad Az.
- Database SQL di Azure. Usare il database come archivio dati di origine. Se non si ha un database in Database SQL di Azure, vedere la procedura per crearne uno descritta in Creare un database in Database SQL di Azure.
- Archiviazione di Azure. Usare l'archivio BLOB come archivio dati sink. Se non si ha un account di archiviazione, vedere Creare un account di archiviazione per informazioni su come crearne uno. Creare un contenitore denominato adftutorial.
- Azure PowerShell. Seguire le istruzioni descritte in Installare e configurare Azure PowerShell.
Creare una tabella di origine dati nel database SQL
Aprire SQL Server Management Studio. In Esplora server fare clic con il pulsante destro del mouse sul database e scegliere Nuova query.
Eseguire questo comando SQL sul database SQL per creare una tabella denominata
data_source_table
come archivio dell'origine dati:create table data_source_table ( PersonID int, Name varchar(255), LastModifytime datetime ); INSERT INTO data_source_table (PersonID, Name, LastModifytime) VALUES (1, 'aaaa','9/1/2017 12:56:00 AM'), (2, 'bbbb','9/2/2017 5:23:00 AM'), (3, 'cccc','9/3/2017 2:36:00 AM'), (4, 'dddd','9/4/2017 3:21:00 AM'), (5, 'eeee','9/5/2017 8:06:00 AM');
In questa esercitazione, si usa LastModifytime come colonna del valore limite. I dati nell'archivio dell'origine dati sono visualizzati nella tabella seguente:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000
Creare un'altra tabella nel database SQL per archiviare il valore del limite massimo
Eseguire questo comando SQL sul database SQL per creare una tabella denominata
watermarktable
in cui archiviare il valore limite:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );
Impostare il valore predefinito del limite massimo con il nome tabella dell'archivio dei dati di origine. In questa esercitazione, il nome della tabella è data_source_table.
INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
Esaminare i dati nella tabella
watermarktable
.Select * from watermarktable
Output:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
Creare una stored procedure nel database SQL
Eseguire questo comando per creare una stored procedure nel database SQL:
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Creare una data factory
Definire una variabile per il nome del gruppo di risorse usato in seguito nei comandi di PowerShell. Copiare il testo del comando seguente in PowerShell, specificare un nome per il gruppo di risorse di Azure tra virgolette doppie e quindi eseguire il comando. Un esempio è
"adfrg"
.$resourceGroupName = "ADFTutorialResourceGroup";
Se il gruppo di risorse esiste già, potrebbe essere preferibile non sovrascriverlo. Assegnare un valore diverso alla variabile
$resourceGroupName
ed eseguire di nuovo il comando.Definire una variabile per la località della data factory.
$location = "East US"
Per creare il gruppo di risorse di Azure, eseguire questo comando:
New-AzResourceGroup $resourceGroupName $location
Se il gruppo di risorse esiste già, potrebbe essere preferibile non sovrascriverlo. Assegnare un valore diverso alla variabile
$resourceGroupName
ed eseguire di nuovo il comando.Definire una variabile per il nome della data factory.
Importante
Aggiornare il nome della data factory in modo che sia univoco a livello globale, ad esempio ADFTutorialFactorySP1127.
$dataFactoryName = "ADFIncCopyTutorialFactory";
Per creare la data factory, eseguire questo cmdlet Set-AzDataFactoryV2:
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "East US" -Name $dataFactoryName
Notare i punti seguenti:
Il nome della data factory deve essere univoco a livello globale. Se viene visualizzato l'errore seguente, modificare il nome e riprovare:
The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.
Per creare istanze di Data Factory, l'account utente usato per accedere ad Azure deve essere un membro dei ruoli collaboratore o proprietario oppure un amministratore della sottoscrizione di Azure.
Per un elenco di aree di Azure in cui Data Factory è attualmente disponibile, selezionare le aree di interesse nella pagina seguente, quindi espandere Analytics per individuare Data Factory: Prodotti disponibili in base all'area. Gli archivi dati (Archiviazione, database SQL di Azure, Istanza gestita di SQL di Azure e così via) e le risorse di calcolo (Azure HDInsight e così via) usati dalla data factory possono trovarsi in altre aree.
Creare servizi collegati
Si creano servizi collegati in una data factory per collegare gli archivi dati e i servizi di calcolo alla data factory. In questa sezione vengono creati i servizi collegati all'account di archiviazione e al database SQL.
Creare un servizio collegato di archiviazione
Creare un file JSON denominato AzureStorageLinkedService.json nella cartella C:\ADF con il contenuto seguente. Se non esiste già, creare la cartella ADF. Sostituire
<accountName>
e<accountKey>
con il nome e la chiave dell'account di archiviazione prima di salvare il file.{ "name": "AzureStorageLinkedService", "properties": { "type": "AzureStorage", "typeProperties": { "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>" } } }
In PowerShell passare alla cartella ADF.
Eseguire il cmdlet Set-AzDataFactoryV2LinkedService per creare il servizio collegato AzureStorageLinkedService. Nell'esempio seguente si passano i valori per i parametri ResourceGroupName e DataFactoryName:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
Di seguito è riportato l'output di esempio:
LinkedServiceName : AzureStorageLinkedService ResourceGroupName : <resourceGroupName> DataFactoryName : <dataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
Creare un servizio collegato per il database SQL
Creare un file JSON denominato AzureSQLDatabaseLinkedService.json nella cartella C:\ADF con il contenuto seguente. Se non esiste già, creare la cartella ADF. Sostituire <your-server-name> e <your-database-name> con il nome del server e del database prima di salvare il file. È anche necessario configurare Sql Server di Azure per concedere l'accesso all'identità gestita della data factory.
{ "name": "AzureSqlDatabaseLinkedService", "properties": { "type": "AzureSqlDatabase", "typeProperties": { "connectionString": "Server=tcp:<your-server-name>.database.windows.net,1433;Database=<your-database-name>;" }, "authenticationType": "ManagedIdentity", "annotations": [] } }
In PowerShell passare alla cartella ADF.
Eseguire il cmdlet Set-AzDataFactoryV2LinkedService per creare il servizio collegato AzureSQLDatabaseLinkedService.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
Di seguito è riportato l'output di esempio:
LinkedServiceName : AzureSQLDatabaseLinkedService ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService ProvisioningState :
Creare i set di dati
In questo passaggio vengono creati set di dati per rappresentare i dati di source e sink.
Creare set di dati di origine
Creare un file JSON denominato SourceDataset.json nella stessa cartella con il contenuto seguente:
{ "name": "SourceDataset", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "data_source_table" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
In questa esercitazione si usa il nome di tabella data_source_table. Sostituirlo se si usa una tabella con un nome diverso.
Eseguire il cmdlet Set-AzDataFactoryV2Dataset per creare il set di dati SourceDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
Ecco l'output di esempio del cmdlet:
DatasetName : SourceDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Creare un set di dati sink
Creare un file JSON denominato SinkDataset.json nella stessa cartella con il contenuto seguente:
{ "name": "SinkDataset", "properties": { "type": "AzureBlob", "typeProperties": { "folderPath": "adftutorial/incrementalcopy", "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')", "format": { "type": "TextFormat" } }, "linkedServiceName": { "referenceName": "AzureStorageLinkedService", "type": "LinkedServiceReference" } } }
Importante
Questo frammento di codice presuppone che nell'archivio BLOB sia presente un contenitore BLOB denominato
adftutorial
. Creare il contenitore se non esiste oppure impostare il nome di un contenitore esistente. La cartella di outputincrementalcopy
viene creata automaticamente se non esiste nel contenitore. In questa esercitazione, il nome del file viene generato in modo dinamico con l'espressione@CONCAT('Incremental-', pipeline().RunId, '.txt')
.Eseguire il cmdlet Set-AzDataFactoryV2Dataset per creare il set di dati SinkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
Ecco l'output di esempio del cmdlet:
DatasetName : SinkDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset
Creare un set di dati per un limite
In questo passaggio si crea un set di dati per l'archiviazione di un valore limite massimo.
Nella stessa cartella creare un file JSON denominato WatermarkDataset.json con il contenuto seguente:
{ "name": " WatermarkDataset ", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "watermarktable" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
Eseguire il cmdlet Set-AzDataFactoryV2Dataset per creare il set di dati WatermarkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
Ecco l'output di esempio del cmdlet:
DatasetName : WatermarkDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Creare una pipeline
In questa esercitazione si crea una pipeline con due attività di ricerca, un'attività di copia e un'attività di stored procedure concatenate in una pipeline.
Creare un file JSON denominato IncrementalCopyPipeline.json nella stessa cartella con il contenuto seguente:
{ "name": "IncrementalCopyPipeline", "properties": { "activities": [ { "name": "LookupOldWaterMarkActivity", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select * from watermarktable" }, "dataset": { "referenceName": "WatermarkDataset", "type": "DatasetReference" } } }, { "name": "LookupNewWaterMarkActivity", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select MAX(LastModifytime) as NewWatermarkvalue from data_source_table" }, "dataset": { "referenceName": "SourceDataset", "type": "DatasetReference" } } }, { "name": "IncrementalCopyActivity", "type": "Copy", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'" }, "sink": { "type": "BlobSink" } }, "dependsOn": [ { "activity": "LookupNewWaterMarkActivity", "dependencyConditions": [ "Succeeded" ] }, { "activity": "LookupOldWaterMarkActivity", "dependencyConditions": [ "Succeeded" ] } ], "inputs": [ { "referenceName": "SourceDataset", "type": "DatasetReference" } ], "outputs": [ { "referenceName": "SinkDataset", "type": "DatasetReference" } ] }, { "name": "StoredProceduretoWriteWatermarkActivity", "type": "SqlServerStoredProcedure", "typeProperties": { "storedProcedureName": "usp_write_watermark", "storedProcedureParameters": { "LastModifiedtime": {"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type": "datetime" }, "TableName": { "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"String"} } }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" }, "dependsOn": [ { "activity": "IncrementalCopyActivity", "dependencyConditions": [ "Succeeded" ] } ] } ] } }
Eseguire il cmdlet Set-AzDataFactoryV2Pipeline per creare la pipeline IncrementalCopyPipeline.
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
Di seguito è riportato l'output di esempio:
PipelineName : IncrementalCopyPipeline ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Activities : {LookupOldWaterMarkActivity, LookupNewWaterMarkActivity, IncrementalCopyActivity, StoredProceduretoWriteWatermarkActivity} Parameters :
Eseguire la pipeline
Eseguire la pipeline IncrementalCopyPipeline usando il cmdlet Invoke-AzDataFactoryV2Pipeline. Sostituire i segnaposto con il nome del gruppo di risorse e della data factory.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
Verificare lo stato della pipeline eseguendo il cmdlet Get-AzDataFactoryV2ActivityRun fino a visualizzare tutte le attività correttamente in esecuzione. Sostituire i segnaposto con la data appropriata per i parametri RunStartedAfter e RunStartedBefore. In questa esercitazione si usa -RunStartedAfter "2017/09/14" e -RunStartedBefore "2017/09/15".
Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
Di seguito è riportato l'output di esempio:
ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupNewWaterMarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {NewWatermarkvalue} LinkedServiceName : ActivityRunStart : 9/14/2017 7:42:42 AM ActivityRunEnd : 9/14/2017 7:42:50 AM DurationInMs : 7777 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupOldWaterMarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {TableName, WatermarkValue} LinkedServiceName : ActivityRunStart : 9/14/2017 7:42:42 AM ActivityRunEnd : 9/14/2017 7:43:07 AM DurationInMs : 25437 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : IncrementalCopyActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, sink} Output : {dataRead, dataWritten, rowsCopied, copyDuration...} LinkedServiceName : ActivityRunStart : 9/14/2017 7:43:10 AM ActivityRunEnd : 9/14/2017 7:43:29 AM DurationInMs : 19769 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : StoredProceduretoWriteWatermarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {storedProcedureName, storedProcedureParameters} Output : {} LinkedServiceName : ActivityRunStart : 9/14/2017 7:43:32 AM ActivityRunEnd : 9/14/2017 7:43:47 AM DurationInMs : 14467 Status : Succeeded Error : {errorCode, message, failureType, target}
Esaminare i risultati
Nell'archivio BLOB (archivio sink), si noterà che i dati sono stati copiati nel file definito in SinkDataset. Nell'esercitazione corrente il nome del file è
Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt
. Aprendo il file si può notare che i record corrispondono ai dati presenti nel database SQL.1,aaaa,2017-09-01 00:56:00.0000000 2,bbbb,2017-09-02 05:23:00.0000000 3,cccc,2017-09-03 02:36:00.0000000 4,dddd,2017-09-04 03:21:00.0000000 5,eeee,2017-09-05 08:06:00.0000000
Verificare il valore più recente da
watermarktable
. Si noterà che il valore limite è stato aggiornato.Select * from watermarktable
Di seguito è riportato l'output di esempio:
TableName WatermarkValue data_source_table 2017-09-05 8:06:00.000
Inserire i dati nell'archivio dell'origine dati per verificare il caricamento dei dati delta
Inserire nuovi dati nel database SQL (archivio dell'origine dati).
INSERT INTO data_source_table VALUES (6, 'newdata','9/6/2017 2:23:00 AM') INSERT INTO data_source_table VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
I dati aggiornati nel database SQL sono i seguenti:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000 6 | newdata | 2017-09-06 02:23:00.000 7 | newdata | 2017-09-07 09:01:00.000
Eseguire di nuovo la pipeline IncrementalCopyPipeline usando il cmdlet Invoke-AzDataFactoryV2Pipeline. Sostituire i segnaposto con il nome del gruppo di risorse e della data factory.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
Verificare lo stato della pipeline eseguendo il cmdlet Get-AzDataFactoryV2ActivityRun fino a visualizzare tutte le attività correttamente in esecuzione. Sostituire i segnaposto con la data appropriata per i parametri RunStartedAfter e RunStartedBefore. In questa esercitazione si usa -RunStartedAfter "2017/09/14" e -RunStartedBefore "2017/09/15".
Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
Di seguito è riportato l'output di esempio:
ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupNewWaterMarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {NewWatermarkvalue} LinkedServiceName : ActivityRunStart : 9/14/2017 8:52:26 AM ActivityRunEnd : 9/14/2017 8:52:58 AM DurationInMs : 31758 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupOldWaterMarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {TableName, WatermarkValue} LinkedServiceName : ActivityRunStart : 9/14/2017 8:52:26 AM ActivityRunEnd : 9/14/2017 8:52:52 AM DurationInMs : 25497 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : IncrementalCopyActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, sink} Output : {dataRead, dataWritten, rowsCopied, copyDuration...} LinkedServiceName : ActivityRunStart : 9/14/2017 8:53:00 AM ActivityRunEnd : 9/14/2017 8:53:20 AM DurationInMs : 20194 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : StoredProceduretoWriteWatermarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {storedProcedureName, storedProcedureParameters} Output : {} LinkedServiceName : ActivityRunStart : 9/14/2017 8:53:23 AM ActivityRunEnd : 9/14/2017 8:53:41 AM DurationInMs : 18502 Status : Succeeded Error : {errorCode, message, failureType, target}
Nell'archivio BLOB è stato creato un altro file. In questa esercitazione, il nome del nuovo file è
Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt
. Aprendo il file si noteranno due righe di record.Verificare il valore più recente da
watermarktable
. Si noterà che il valore limite è stato aggiornato di nuovo.Select * from watermarktable
Output di esempio:
TableName WatermarkValue data_source_table 2017-09-07 09:01:00.000
Contenuto correlato
In questa esercitazione sono stati eseguiti i passaggi seguenti:
- Preparare l'archivio dati per l'archiviazione del valore limite.
- Creare una data factory.
- Creare servizi collegati.
- Creare i set di dati di origine, sink e limite.
- Creare una pipeline.
- Esegui la pipeline.
- Monitorare l'esecuzione della pipeline.
In questa esercitazione la pipeline ha copiato dati da una singola tabella del database SQL di Azure a una risorsa di archiviazione BLOB. Passare all'esercitazione successiva per informazioni sulla copia di dati da più tabelle di un database di SQL Server a un database SQL.