Przyrostowe ładowanie danych z usługi Azure SQL Database do usługi Azure Blob Storage przy użyciu programu PowerShell
DOTYCZY: Azure Data Factory Azure Synapse Analytics
Napiwek
Wypróbuj usługę Data Factory w usłudze Microsoft Fabric — rozwiązanie analityczne typu all-in-one dla przedsiębiorstw. Usługa Microsoft Fabric obejmuje wszystko, od przenoszenia danych do nauki o danych, analizy w czasie rzeczywistym, analizy biznesowej i raportowania. Dowiedz się, jak bezpłatnie rozpocząć nową wersję próbną !
W tym samouczku użyjesz usługi Azure Data Factory do utworzenia potoku, który ładuje dane różnicowe z tabeli w usłudze Azure SQL Database do usługi Azure Blob Storage.
Ten samouczek obejmuje następujące procedury:
- Przygotowywanie magazynu danych do przechowywania wartości limitu.
- Tworzenie fabryki danych.
- Tworzenie połączonych usług.
- Tworzenie zestawów danych źródła, ujścia i limitu.
- Tworzenie potoku.
- Uruchom potok.
- Monitorowanie uruchomienia potoku.
Omówienie
Diagram ogólny rozwiązania wygląda następująco:
Poniżej przedstawiono ważne czynności związane z tworzeniem tego rozwiązania:
Wybierz kolumnę limitu. Wybierz jedną kolumnę w magazynie danych źródłowych, która może służyć do tworzenia wycinków nowych lub zaktualizowanych rekordów dla każdego przebiegu. Zazwyczaj dane w tej wybranej kolumnie (na przykład last_modify_time lub ID) rosną wraz z tworzeniem i aktualizacją wierszy. Maksymalna wartość w tej kolumnie jest używana jako limit.
Przygotuj magazyn danych do przechowywania wartości limitu.
W tym samouczku wartość limitu jest przechowywana w bazie danych SQL.Utwórz potok z następującym przepływem pracy:
Potok w tym rozwiązaniu obejmuje następujące działania:
- Utwórz dwa działania Lookup. Użyj pierwszego działania Lookup do pobrania ostatniej wartości limitu. Użyj drugiego działania Lookup do pobrania nowej wartości limitu. Te wartości limitu są przekazywane do działania Copy.
- Utwórz działanie Kopiuj, który kopiuje wiersze ze źródłowego magazynu danych z wartością kolumny limitu większej niż stara wartość limitu i mniejsza lub równa nowej wartości limitu. Następnie kopiuje dane różnicowe ze źródłowego magazynu danych do usługi Blob Storage jako nowy plik.
- Utwórz działanie StoredProcedure, które aktualizuje wartość limitu dla potoku przy następnym uruchomieniu.
Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto.
Wymagania wstępne
Uwaga
Do interakcji z platformą Azure zalecamy używanie modułu Azure Az w programie PowerShell. Zobacz Instalowanie programu Azure PowerShell, aby rozpocząć. Aby dowiedzieć się, jak przeprowadzić migrację do modułu Az PowerShell, zobacz Migracja programu Azure PowerShell z modułu AzureRM do modułu Az.
- Usługa Azure SQL Database. Baza danych jest używana jako źródłowy magazyn danych. Jeśli nie masz bazy danych w usłudze Azure SQL Database, zobacz Tworzenie bazy danych w usłudze Azure SQL Database , aby uzyskać instrukcje tworzenia bazy danych.
- Azure Storage. Magazyn obiektów blob jest używany jako magazyn danych ujścia. Jeśli nie masz konta magazynu, utwórz je, wykonując czynności przedstawione w artykule Tworzenie konta magazynu. Utwórz kontener o nazwie adftutorial.
- Azure PowerShell. Wykonaj instrukcje podane w temacie Instalowanie i konfigurowanie programu Azure PowerShell.
Tworzenie tabeli danych źródłowych w bazie danych SQL
Otwórz SQL Server Management Studio. W Eksploratorze serwera kliknij prawym przyciskiem myszy bazę danych, a następnie wybierz pozycję Nowe zapytanie.
Uruchom następujące polecenie SQL względem bazy danych SQL, aby utworzyć tabelę o nazwie
data_source_table
jako źródłowy magazyn danych:create table data_source_table ( PersonID int, Name varchar(255), LastModifytime datetime ); INSERT INTO data_source_table (PersonID, Name, LastModifytime) VALUES (1, 'aaaa','9/1/2017 12:56:00 AM'), (2, 'bbbb','9/2/2017 5:23:00 AM'), (3, 'cccc','9/3/2017 2:36:00 AM'), (4, 'dddd','9/4/2017 3:21:00 AM'), (5, 'eeee','9/5/2017 8:06:00 AM');
W tym samouczku użyjesz kolumny LastModifytime jako kolumny limitu. Dane w źródłowym magazynie danych zostały przedstawione w poniższej tabeli:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000
Tworzenie innej tabeli w bazie danych SQL do przechowywania wartości górnego limitu
Uruchom następujące polecenie SQL względem bazy danych SQL, aby utworzyć tabelę o nazwie
watermarktable
w celu przechowywania wartości limitu:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );
Ustaw domyślną wartość górnego limitu z nazwą tabeli źródłowego magazynu danych. W tym samouczku nazwa tabeli to data_source_table.
INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
Sprawdź dane w tabeli
watermarktable
.Select * from watermarktable
Wyjście:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
Tworzenie procedur składowanych w bazie danych SQL
Uruchom następujące polecenie, aby utworzyć procedurę składowaną w bazie danych SQL:
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Tworzenie fabryki danych
Zdefiniuj zmienną nazwy grupy zasobów, której użyjesz później w poleceniach programu PowerShell. Skopiuj poniższy tekst polecenia do programu PowerShell, podaj nazwę grupy zasobów platformy Azure w podwójnych cudzysłowach, a następnie uruchom polecenie. Może to być na przykład
"adfrg"
.$resourceGroupName = "ADFTutorialResourceGroup";
Jeśli grupa zasobów już istnieje, jej zastąpienie może być niewskazane. Przypisz inną wartość do zmiennej
$resourceGroupName
i ponownie uruchom polecenie.Zdefiniuj zmienną lokalizacji fabryki danych.
$location = "East US"
Aby utworzyć grupę zasobów platformy Azure, uruchom następujące polecenie:
New-AzResourceGroup $resourceGroupName $location
Jeśli grupa zasobów już istnieje, jej zastąpienie może być niewskazane. Przypisz inną wartość do zmiennej
$resourceGroupName
i ponownie uruchom polecenie.Zdefiniuj zmienną nazwy fabryki danych.
Ważne
Zaktualizuj nazwę fabryki danych, aby była unikatowa w skali globalnej. Na przykład: ADFTutorialFactorySP1127.
$dataFactoryName = "ADFIncCopyTutorialFactory";
Aby utworzyć fabrykę danych, uruchom następujące polecenie cmdlet Set-AzDataFactoryV2 :
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "East US" -Name $dataFactoryName
Należy uwzględnić następujące informacje:
Nazwa fabryki danych musi być globalnie unikatowa. Jeśli zostanie wyświetlony następujący błąd, zmień nazwę i spróbuj ponownie:
The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.
Aby utworzyć wystąpienia usługi Data Factory, konto użytkownika używane do logowania się na platformie Azure musi być członkiem roli współautora lub właściciela albo administratorem subskrypcji platformy Azure.
Aby uzyskać listę regionów platformy Azure, w których obecnie jest dostępna usługa Data Factory, wybierz dane regiony na poniższej stronie, a następnie rozwiń węzeł Analiza, aby zlokalizować pozycję Data Factory: Produkty dostępne według regionu. Magazyny danych (Storage, SQL Database, Azure SQL Managed Instance itd.) i obliczenia (Azure HDInsight itp.) używane przez fabrykę danych mogą znajdować się w innych regionach.
Tworzenie połączonych usług
Połączone usługi tworzy się w fabryce danych w celu połączenia magazynów danych i usług obliczeniowych z fabryką danych. W tej sekcji utworzysz połączone usługi z kontem magazynu i usługą SQL Database.
Tworzenie połączonej usługi Storage
W folderze C:\ADF utwórz plik JSON o nazwie AzureStorageLinkedService.json z następującą zawartością. (Utwórz folder ADF, jeśli jeszcze nie istnieje). Przed zapisaniem pliku zastąp
<accountName>
wartości i<accountKey>
nazwą i kluczem konta magazynu.{ "name": "AzureStorageLinkedService", "properties": { "type": "AzureStorage", "typeProperties": { "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>" } } }
W programie PowerShell przejdź do folderu ADF.
Uruchom polecenie cmdlet Set-AzDataFactoryV2LinkedService, aby utworzyć połączoną usługę AzureStorageLinkedService. W poniższym przykładzie przekazujesz wartości dla parametrów ResourceGroupName i DataFactoryName:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
Oto przykładowe dane wyjściowe:
LinkedServiceName : AzureStorageLinkedService ResourceGroupName : <resourceGroupName> DataFactoryName : <dataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
Tworzenie połączonej usługi bazy danych SQL
W folderze C:\ADF utwórz plik JSON o nazwie AzureSQLDatabaseLinkedService.json z następującą zawartością. (Utwórz folder ADF, jeśli jeszcze nie istnieje). Przed zapisaniem pliku zastąp <ciąg your-server-name>> i <nazwa-bazy danych nazwą serwera i bazy danych. Należy również skonfigurować program Azure SQL Server w celu udzielenia dostępu do tożsamości zarządzanej fabryki danych.
{ "name": "AzureSqlDatabaseLinkedService", "properties": { "type": "AzureSqlDatabase", "typeProperties": { "connectionString": "Server=tcp:<your-server-name>.database.windows.net,1433;Database=<your-database-name>;" }, "authenticationType": "ManagedIdentity", "annotations": [] } }
W programie PowerShell przejdź do folderu ADF.
Uruchom polecenie cmdlet Set-AzDataFactoryV2LinkedService, aby utworzyć połączoną usługę AzureSQLDatabaseLinkedService.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
Oto przykładowe dane wyjściowe:
LinkedServiceName : AzureSQLDatabaseLinkedService ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService ProvisioningState :
Tworzenie zestawów danych
W tym kroku utworzysz zestawy danych reprezentujące dane źródłowe i ujścia.
Tworzenie zestawu danych źródłowych
Utwórz plik JSON o nazwie SourceDataset.json w tym samym folderze, o następującej zawartości:
{ "name": "SourceDataset", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "data_source_table" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
W tym samouczku użyto nazwy tabeli data_source_table. Zastąp ją, jeśli używasz tabeli o innej nazwie.
Uruchom polecenie cmdlet Set-AzDataFactoryV2Dataset, aby utworzyć zestaw danych SourceDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
Oto przykładowe dane wyjściowe polecenia cmdlet:
DatasetName : SourceDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Tworzenie ujścia zestawu danych
Utwórz plik JSON o nazwie SinkDataset.json w tym samym folderze, o następującej zawartości:
{ "name": "SinkDataset", "properties": { "type": "AzureBlob", "typeProperties": { "folderPath": "adftutorial/incrementalcopy", "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')", "format": { "type": "TextFormat" } }, "linkedServiceName": { "referenceName": "AzureStorageLinkedService", "type": "LinkedServiceReference" } } }
Ważne
W tym fragmencie kodu założono, że masz kontener obiektów blob o nazwie
adftutorial
w magazynie obiektów blob. Utwórz ten kontener, jeśli nie istnieje, lub zastąp go nazwą istniejącego kontenera. Folder wyjściowyincrementalcopy
jest tworzony automatycznie, jeśli nie występuje w kontenerze. W tym samouczku nazwa pliku jest generowana dynamicznie przy użyciu wyrażenia@CONCAT('Incremental-', pipeline().RunId, '.txt')
.Uruchom polecenie cmdlet Set-AzDataFactoryV2Dataset, aby utworzyć zestaw danych SinkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
Oto przykładowe dane wyjściowe polecenia cmdlet:
DatasetName : SinkDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset
Tworzenie zestawu danych dla limitu
W tym kroku utworzysz zestaw danych do przechowywania wartości górnego limitu.
Utwórz plik JSON o nazwie WatermarkDataset.json w tym samym folderze, o następującej zawartości:
{ "name": " WatermarkDataset ", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "watermarktable" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
Uruchom polecenie cmdlet Set-AzDataFactoryV2Dataset, aby utworzyć zestaw danych WatermarkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
Oto przykładowe dane wyjściowe polecenia cmdlet:
DatasetName : WatermarkDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Tworzenie potoku
W tym samouczku utworzysz potok z dwoma działaniami Lookup, jednym działaniem Copy i jednym działaniem StoredProcedure, połączonymi w jednym potoku.
Utwórz w tym samym folderze plik JSON IncrementalCopyPipeline.json o następującej zawartości:
{ "name": "IncrementalCopyPipeline", "properties": { "activities": [ { "name": "LookupOldWaterMarkActivity", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select * from watermarktable" }, "dataset": { "referenceName": "WatermarkDataset", "type": "DatasetReference" } } }, { "name": "LookupNewWaterMarkActivity", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select MAX(LastModifytime) as NewWatermarkvalue from data_source_table" }, "dataset": { "referenceName": "SourceDataset", "type": "DatasetReference" } } }, { "name": "IncrementalCopyActivity", "type": "Copy", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'" }, "sink": { "type": "BlobSink" } }, "dependsOn": [ { "activity": "LookupNewWaterMarkActivity", "dependencyConditions": [ "Succeeded" ] }, { "activity": "LookupOldWaterMarkActivity", "dependencyConditions": [ "Succeeded" ] } ], "inputs": [ { "referenceName": "SourceDataset", "type": "DatasetReference" } ], "outputs": [ { "referenceName": "SinkDataset", "type": "DatasetReference" } ] }, { "name": "StoredProceduretoWriteWatermarkActivity", "type": "SqlServerStoredProcedure", "typeProperties": { "storedProcedureName": "usp_write_watermark", "storedProcedureParameters": { "LastModifiedtime": {"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type": "datetime" }, "TableName": { "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"String"} } }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" }, "dependsOn": [ { "activity": "IncrementalCopyActivity", "dependencyConditions": [ "Succeeded" ] } ] } ] } }
Uruchom polecenie cmdlet Set-AzDataFactoryV2Pipeline, aby utworzyć potok IncrementalCopyPipeline.
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
Oto przykładowe dane wyjściowe:
PipelineName : IncrementalCopyPipeline ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Activities : {LookupOldWaterMarkActivity, LookupNewWaterMarkActivity, IncrementalCopyActivity, StoredProceduretoWriteWatermarkActivity} Parameters :
Uruchamianie potoku
Uruchom potok IncrementalCopyPipeline przy użyciu polecenia cmdlet Invoke-AzDataFactoryV2Pipeline . Zastąp symbole zastępcze własną nazwą grupy zasobów i fabryki danych.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
Sprawdź stan potoku, uruchamiając polecenie cmdlet Get-AzDataFactoryV2ActivityRun do momentu pomyślnego uruchomienia wszystkich działań. Zastąp symbole zastępcze własnym odpowiednim czasem dla parametrów RunStartedAfter i RunStartedBefore. W tym samouczku użyto parametrów -RunStartedAfter "2017/09/14" oraz -RunStartedBefore "2017/09/15".
Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
Oto przykładowe dane wyjściowe:
ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupNewWaterMarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {NewWatermarkvalue} LinkedServiceName : ActivityRunStart : 9/14/2017 7:42:42 AM ActivityRunEnd : 9/14/2017 7:42:50 AM DurationInMs : 7777 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupOldWaterMarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {TableName, WatermarkValue} LinkedServiceName : ActivityRunStart : 9/14/2017 7:42:42 AM ActivityRunEnd : 9/14/2017 7:43:07 AM DurationInMs : 25437 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : IncrementalCopyActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, sink} Output : {dataRead, dataWritten, rowsCopied, copyDuration...} LinkedServiceName : ActivityRunStart : 9/14/2017 7:43:10 AM ActivityRunEnd : 9/14/2017 7:43:29 AM DurationInMs : 19769 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : StoredProceduretoWriteWatermarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {storedProcedureName, storedProcedureParameters} Output : {} LinkedServiceName : ActivityRunStart : 9/14/2017 7:43:32 AM ActivityRunEnd : 9/14/2017 7:43:47 AM DurationInMs : 14467 Status : Succeeded Error : {errorCode, message, failureType, target}
Sprawdzanie wyników
W magazynie obiektów blob (magazynie ujścia) zobaczysz, że dane zostały skopiowane do pliku zdefiniowanego w parametrze SinkDataset. W tym samouczku nazwą pliku jest
Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt
. Otwórz plik, aby wyświetlić rekordy w pliku, identyczne z danymi w bazie danych SQL.1,aaaa,2017-09-01 00:56:00.0000000 2,bbbb,2017-09-02 05:23:00.0000000 3,cccc,2017-09-03 02:36:00.0000000 4,dddd,2017-09-04 03:21:00.0000000 5,eeee,2017-09-05 08:06:00.0000000
Sprawdź najnowszą wartość w tabeli
watermarktable
. Zobaczysz, że wartość limitu została zaktualizowana.Select * from watermarktable
Oto przykładowe dane wyjściowe:
TableName WatermarkValue data_source_table 2017-09-05 8:06:00.000
Wstawianie danych ze źródła danych w celu sprawdzenia ładowania danych różnicowych
Wstaw nowe dane do bazy danych SQL (źródłowego magazynu danych źródłowych).
INSERT INTO data_source_table VALUES (6, 'newdata','9/6/2017 2:23:00 AM') INSERT INTO data_source_table VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
Zaktualizowane dane w bazie danych SQL są następujące:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000 6 | newdata | 2017-09-06 02:23:00.000 7 | newdata | 2017-09-07 09:01:00.000
Uruchom ponownie potok IncrementalCopyPipeline przy użyciu polecenia cmdlet Invoke-AzDataFactoryV2Pipeline . Zastąp symbole zastępcze własną nazwą grupy zasobów i fabryki danych.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
Sprawdź stan potoku, uruchamiając polecenie cmdlet Get-AzDataFactoryV2ActivityRun do momentu pomyślnego uruchomienia wszystkich działań. Zastąp symbole zastępcze własnym odpowiednim czasem dla parametrów RunStartedAfter i RunStartedBefore. W tym samouczku użyto parametrów -RunStartedAfter "2017/09/14" oraz -RunStartedBefore "2017/09/15".
Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
Oto przykładowe dane wyjściowe:
ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupNewWaterMarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {NewWatermarkvalue} LinkedServiceName : ActivityRunStart : 9/14/2017 8:52:26 AM ActivityRunEnd : 9/14/2017 8:52:58 AM DurationInMs : 31758 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupOldWaterMarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {TableName, WatermarkValue} LinkedServiceName : ActivityRunStart : 9/14/2017 8:52:26 AM ActivityRunEnd : 9/14/2017 8:52:52 AM DurationInMs : 25497 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : IncrementalCopyActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, sink} Output : {dataRead, dataWritten, rowsCopied, copyDuration...} LinkedServiceName : ActivityRunStart : 9/14/2017 8:53:00 AM ActivityRunEnd : 9/14/2017 8:53:20 AM DurationInMs : 20194 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : StoredProceduretoWriteWatermarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {storedProcedureName, storedProcedureParameters} Output : {} LinkedServiceName : ActivityRunStart : 9/14/2017 8:53:23 AM ActivityRunEnd : 9/14/2017 8:53:41 AM DurationInMs : 18502 Status : Succeeded Error : {errorCode, message, failureType, target}
W magazynie obiektów blob zobaczysz, że utworzono kolejny plik. W tym samouczku nazwa nowego pliku to
Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt
. Otwórz ten plik — zobaczysz w nim dwa wiersze rekordów.Sprawdź najnowszą wartość w tabeli
watermarktable
. Zobaczysz, że wartość limitu została ponownie zaktualizowana.Select * from watermarktable
Przykładowe dane wyjściowe:
TableName WatermarkValue data_source_table 2017-09-07 09:01:00.000
Powiązana zawartość
W ramach tego samouczka wykonano następujące procedury:
- Przygotowywanie magazynu danych do przechowywania wartości limitu.
- Tworzenie fabryki danych.
- Tworzenie połączonych usług.
- Tworzenie zestawów danych źródła, ujścia i limitu.
- Tworzenie potoku.
- Uruchom potok.
- Monitorowanie uruchomienia potoku.
W tym samouczku potok skopiował dane z pojedynczej tabeli w usłudze Azure SQL Database do usługi Blob Storage. Przejdź do poniższego samouczka, aby dowiedzieć się, jak kopiować dane z wielu tabel w bazie danych programu SQL Server do usługi SQL Database.