Větvení a řetězení aktivit v kanálech Data Factory
PLATÍ PRO: Azure Data Factory Azure Synapse Analytics
Tip
Vyzkoušejte si službu Data Factory v Microsoft Fabric, řešení pro analýzy typu all-in-one pro podniky. Microsoft Fabric zahrnuje všechno od přesunu dat až po datové vědy, analýzy v reálném čase, business intelligence a vytváření sestav. Přečtěte si, jak začít používat novou zkušební verzi zdarma.
V tomto kurzu vytvoříte kanál služby Data Factory, který předvádí některé funkce toku řízení. Tento kanál zkopíruje z kontejneru ve službě Azure Blob Storage do jiného kontejneru ve stejném účtu úložiště. Pokud aktivita kopírování proběhne úspěšně, kanál odešle podrobnosti o úspěšné operaci kopírování v e-mailu. Tyto informace můžou zahrnovat množství zapsaných dat. Pokud aktivita kopírování selže, odešle podrobnosti o selhání kopírování, jako je například chybová zpráva, v e-mailu. V rámci tohoto kurzu se dozvíte, jak předávat parametry.
Tento obrázek poskytuje přehled scénáře:
V tomto kurzu se dozvíte, jak provádět následující úlohy:
- Vytvoření datové továrny
- Vytvoření propojené služby Azure Storage
- Vytvoření datové sady Azure Blob
- Vytvoření kanálu, který obsahuje aktivitu kopírování a aktivitu webu
- Odeslání výstupů aktivit následným aktivitám
- Použití předávání parametrů a systémových proměnných
- Spuštění kanálu
- Monitorování spuštění aktivit a kanálu
Tento kurz používá .NET SDK. K interakci se službou Azure Data Factory můžete použít jiné mechanismy. Rychlé starty služby Data Factory najdete v 5minutových rychlých startech.
Pokud ještě nemáte předplatné Azure, vytvořte si napřed bezplatný účet.
Požadavky
- Účet služby Azure Storage. Úložiště objektů blob použijete jako zdrojové úložiště dat. Pokud nemáte účet úložiště Azure, přečtěte si téma Vytvoření účtu úložiště.
- Azure Storage Explorer. Pokud chcete tento nástroj nainstalovat, přečtěte si Průzkumník služby Azure Storage.
- Azure SQL Database Tuto databázi použijete jako úložiště dat jímky. Pokud nemáte databázi ve službě Azure SQL Database, přečtěte si téma Vytvoření databáze ve službě Azure SQL Database.
- Visual Studio. Tento článek používá Visual Studio 2019.
- Azure .NET SDK. Stáhněte a nainstalujte sadu Azure .NET SDK.
Seznam oblastí Azure, ve kterých je služba Data Factory aktuálně dostupná, najdete v tématu Produkty dostupné v jednotlivých oblastech. Úložiště a výpočty dat můžou být v jiných oblastech. Úložiště zahrnují Azure Storage a Azure SQL Database. Výpočetní prostředky zahrnují HDInsight, které služba Data Factory používá.
Vytvořte aplikaci, jak je popsáno v tématu Vytvoření aplikace Microsoft Entra. Podle pokynů ve stejném článku přiřaďte aplikaci k roli Přispěvatel . Pro pozdější části tohoto kurzu budete potřebovat několik hodnot, například ID aplikace (klienta) a ID adresáře (tenanta).
Vytvoření tabulky objektů blob
Otevřete textový editor. Zkopírujte následující text a uložte ho místně jako input.txt.
Ethel|Berg Tamika|Walsh
Otevřete Průzkumník služby Azure Storage. Rozbalte svůj účet úložiště. Pravým tlačítkem klikněte na Kontejnery objektů blob a vyberte Vytvořit kontejner objektů blob.
Pojmenujte nový kontejner adfv2branch a vyberte Nahrát a přidejte do kontejneru input.txt soubor.
Vytvoření projektu sady Visual Studio
Vytvoření konzolové aplikace C# .NET:
- Spusťte Visual Studio a vyberte Vytvořit nový projekt.
- V části Vytvořit nový projekt zvolte Konzolová aplikace (.NET Framework) pro C# a vyberte Další.
- Pojmenujte projekt ADFv2BranchTutorial.
- Vyberte .NET verze 4.5.2 nebo novější a pak vyberte Vytvořit.
Instalace balíčků NuGet
Vyberte nástroje>NuGet Správce balíčků> Správce balíčků konzoly.
V konzole Správce balíčků spusťte následující příkazy pro instalaci balíčků. Podrobnosti najdete v balíčku NuGet Microsoft.Azure.Management.DataFactory.
Install-Package Microsoft.Azure.Management.DataFactory Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
Vytvoření klienta datové továrny
Otevřete Program.cs a přidejte následující příkazy:
using System; using System.Collections.Generic; using System.Linq; using Microsoft.Rest; using Microsoft.Azure.Management.ResourceManager; using Microsoft.Azure.Management.DataFactory; using Microsoft.Azure.Management.DataFactory.Models; using Microsoft.IdentityModel.Clients.ActiveDirectory;
Přidejte tyto statické proměnné do
Program
třídy. Zástupné znaky nahraďte vlastními hodnotami.// Set variables static string tenantID = "<tenant ID>"; static string applicationId = "<application ID>"; static string authenticationKey = "<Authentication key for your application>"; static string subscriptionId = "<Azure subscription ID>"; static string resourceGroup = "<Azure resource group name>"; static string region = "East US"; static string dataFactoryName = "<Data factory name>"; // Specify the source Azure Blob information static string storageAccount = "<Azure Storage account name>"; static string storageKey = "<Azure Storage account key>"; // confirm that you have the input.txt file placed in th input folder of the adfv2branch container. static string inputBlobPath = "adfv2branch/input"; static string inputBlobName = "input.txt"; static string outputBlobPath = "adfv2branch/output"; static string emailReceiver = "<specify email address of the receiver>"; static string storageLinkedServiceName = "AzureStorageLinkedService"; static string blobSourceDatasetName = "SourceStorageDataset"; static string blobSinkDatasetName = "SinkStorageDataset"; static string pipelineName = "Adfv2TutorialBranchCopy"; static string copyBlobActivity = "CopyBlobtoBlob"; static string sendFailEmailActivity = "SendFailEmailActivity"; static string sendSuccessEmailActivity = "SendSuccessEmailActivity";
Do metody
Main
přidejte následující kód. Tento kód vytvoří instanciDataFactoryManagementClient
třídy. Tento objekt pak použijete k vytvoření datové továrny, propojené služby, datových sad a kanálu. Tento objekt můžete také použít k monitorování podrobností o spuštění kanálu.// Authenticate and create a data factory management client var context = new AuthenticationContext("https://login.windows.net/" + tenantID); ClientCredential cc = new ClientCredential(applicationId, authenticationKey); AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result; ServiceClientCredentials cred = new TokenCredentials(result.AccessToken); var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
Vytvoření datové továrny
Přidejte do souboru Program.cs metodu
CreateOrUpdateDataFactory
:static Factory CreateOrUpdateDataFactory(DataFactoryManagementClient client) { Console.WriteLine("Creating data factory " + dataFactoryName + "..."); Factory resource = new Factory { Location = region }; Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings)); Factory response; { response = client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, resource); } while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation") { System.Threading.Thread.Sleep(1000); } return response; }
Do metody, která vytvoří datovou továrnu, přidejte následující řádek
Main
:Factory df = CreateOrUpdateDataFactory(client);
Vytvoření propojené služby Azure Storage
Přidejte do souboru Program.cs metodu
StorageLinkedServiceDefinition
:static LinkedServiceResource StorageLinkedServiceDefinition(DataFactoryManagementClient client) { Console.WriteLine("Creating linked service " + storageLinkedServiceName + "..."); AzureStorageLinkedService storageLinkedService = new AzureStorageLinkedService { ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey) }; Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings)); LinkedServiceResource linkedService = new LinkedServiceResource(storageLinkedService, name:storageLinkedServiceName); return linkedService; }
Do metody, která vytvoří propojenou službu Azure Storage, přidejte následující řádek
Main
:client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
Další informace o podporovaných vlastnostech a podrobnostech naleznete v tématu Vlastnosti propojené služby.
Vytvoření datových sad
V této části vytvoříte dvě datové sady, jednu pro zdroj a jednu pro jímku.
Vytvoření datové sady pro zdrojový objekt blob Azure
Přidejte metodu , která vytvoří datovou sadu objektů blob Azure. Další informace o podporovaných vlastnostech a podrobnostech najdete v tématu Vlastnosti datové sady Azure Blob.
Přidejte do souboru Program.cs metoduSourceBlobDatasetDefinition
:
static DatasetResource SourceBlobDatasetDefinition(DataFactoryManagementClient client)
{
Console.WriteLine("Creating dataset " + blobSourceDatasetName + "...");
AzureBlobDataset blobDataset = new AzureBlobDataset
{
FolderPath = new Expression { Value = "@pipeline().parameters.sourceBlobContainer" },
FileName = inputBlobName,
LinkedServiceName = new LinkedServiceReference
{
ReferenceName = storageLinkedServiceName
}
};
Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
DatasetResource dataset = new DatasetResource(blobDataset, name:blobSourceDatasetName);
return dataset;
}
Nadefinujete datovou sadu, která představuje zdrojová data v objektu blob Azure. Tato datová sada objektů blob odkazuje na propojenou službu Azure Storage podporovanou v předchozím kroku. Datová sada objektů blob popisuje umístění objektu blob, ze které se má kopírovat: FolderPath a FileName.
Všimněte si použití parametrů pro FolderPath. sourceBlobContainer
je název parametru a výraz se nahradí hodnotami předanými při spuštění kanálu. Syntaxe pro definování parametrů je @pipeline().parameters.<parameterName>
Vytvoření datové sady pro azure blob jímky
Přidejte do souboru Program.cs metodu
SourceBlobDatasetDefinition
:static DatasetResource SinkBlobDatasetDefinition(DataFactoryManagementClient client) { Console.WriteLine("Creating dataset " + blobSinkDatasetName + "..."); AzureBlobDataset blobDataset = new AzureBlobDataset { FolderPath = new Expression { Value = "@pipeline().parameters.sinkBlobContainer" }, LinkedServiceName = new LinkedServiceReference { ReferenceName = storageLinkedServiceName } }; Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings)); DatasetResource dataset = new DatasetResource(blobDataset, name: blobSinkDatasetName); return dataset; }
Do metody, která vytvoří zdroj objektů blob Azure i datové sady jímky, přidejte následující kód
Main
.client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client)); client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
Vytvoření třídy EmailRequest v C#
V projektu jazyka C# vytvořte třídu s názvem EmailRequest
. Tato třída definuje, jaké vlastnosti kanál odesílá v požadavku textu při odesílání e-mailu. V tomto kurzu kanál do e-mailu odešle čtyři vlastnosti:
- Message. Text e-mailu Pro úspěšnou kopii tato vlastnost obsahuje množství zapsaných dat. U neúspěšné kopie obsahuje tato vlastnost podrobnosti o chybě.
- Název datové továrny Název datové továrny.
- Název kanálu Název kanálu.
- Přijímač. Parametr, který prochází. Tato vlastnost určuje příjemce e-mailu.
class EmailRequest
{
[Newtonsoft.Json.JsonProperty(PropertyName = "message")]
public string message;
[Newtonsoft.Json.JsonProperty(PropertyName = "dataFactoryName")]
public string dataFactoryName;
[Newtonsoft.Json.JsonProperty(PropertyName = "pipelineName")]
public string pipelineName;
[Newtonsoft.Json.JsonProperty(PropertyName = "receiver")]
public string receiver;
public EmailRequest(string input, string df, string pipeline, string receiverName)
{
message = input;
dataFactoryName = df;
pipelineName = pipeline;
receiver = receiverName;
}
}
Vytvoření koncových bodů pracovního postupu pro e-maily
K aktivaci odesílání e-mailu použijete Azure Logic Apps k definování pracovního postupu. Další informace najdete v tématu Vytvoření ukázkového pracovního postupu aplikace logiky Consumption.
Pracovní postup pro e-maily s informací o úspěchu
Na webu Azure Portal vytvořte pracovní postup aplikace logiky s názvem CopySuccessEmail
. Přidejte trigger požadavku s názvem Při přijetí požadavku HTTP. V triggeru požadavku vyplňte do pole Schématu JSON textu požadavku následující kód JSON:
{
"properties": {
"dataFactoryName": {
"type": "string"
},
"message": {
"type": "string"
},
"pipelineName": {
"type": "string"
},
"receiver": {
"type": "string"
}
},
"type": "object"
}
Váš pracovní postup vypadá přibližně jako v následujícím příkladu:
Tento obsah JSON odpovídá EmailRequest
třídě, kterou jste vytvořili v předchozí části.
Přidejte akci Office 365 Outlook s názvem Odeslat e-mail. Pro tuto akci si přizpůsobte, jak chcete e-mail formátovat, pomocí vlastností předaných ve schématu JSON textu požadavku. Tady je příklad:
Po uložení pracovního postupu zkopírujte a uložte hodnotu adresy URL HTTP POST z triggeru.
Pracovní postup pro e-maily s informací o úspěchu
Naklonujte CopySuccessEmail
pracovní postup aplikace logiky do nového pracovního postupu s názvem CopyFailEmail
. V triggeru požadavku je schéma JSON textu požadavku stejné. Změňte formát e-mailu, například Subject
, tak, aby to odpovídalo neúspěchu. Zde je příklad:
Po uložení pracovního postupu zkopírujte a uložte hodnotu adresy URL HTTP POST z triggeru.
Teď byste měli mít dvě adresy URL pracovního postupu, například následující příklady:
//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000
//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000
Vytvořit kanál
Vraťte se do projektu v sadě Visual Studio. Teď přidáme kód, který vytvoří kanál s aktivitou kopírování a DependsOn
vlastností. V tomto kurzu kanál obsahuje jednu aktivitu, aktivitu kopírování, která přebírá datovou sadu objektů blob jako zdroj a jinou datovou sadu objektů blob jako jímku. Pokud aktivita kopírování proběhne úspěšně nebo selže, volá různé e-mailové úlohy.
V tomto kanálu použijete následující funkce:
- Parametry
- Webová aktivita
- Závislost aktivit
- Použití výstupu z aktivity jako vstupu do jiné aktivity
Přidejte tuto metodu do projektu. Další podrobnosti najdete v následujících částech.
static PipelineResource PipelineDefinition(DataFactoryManagementClient client) { Console.WriteLine("Creating pipeline " + pipelineName + "..."); PipelineResource resource = new PipelineResource { Parameters = new Dictionary<string, ParameterSpecification> { { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } }, { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } }, { "receiver", new ParameterSpecification { Type = ParameterType.String } } }, Activities = new List<Activity> { new CopyActivity { Name = copyBlobActivity, Inputs = new List<DatasetReference> { new DatasetReference { ReferenceName = blobSourceDatasetName } }, Outputs = new List<DatasetReference> { new DatasetReference { ReferenceName = blobSinkDatasetName } }, Source = new BlobSource { }, Sink = new BlobSink { } }, new WebActivity { Name = sendSuccessEmailActivity, Method = WebActivityMethod.POST, Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/00000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000000", Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"), DependsOn = new List<ActivityDependency> { new ActivityDependency { Activity = copyBlobActivity, DependencyConditions = new List<String> { "Succeeded" } } } }, new WebActivity { Name = sendFailEmailActivity, Method =WebActivityMethod.POST, Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000", Body = new EmailRequest("@{activity('CopyBlobtoBlob').error.message}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"), DependsOn = new List<ActivityDependency> { new ActivityDependency { Activity = copyBlobActivity, DependencyConditions = new List<String> { "Failed" } } } } } }; Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings)); return resource; }
Do metody, která vytvoří kanál, přidejte následující řádek
Main
:client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
Parametry
První část kódu kanálu definuje parametry.
sourceBlobContainer
. Zdrojová datová sada objektů blob využívá tento parametr v kanálu.sinkBlobContainer
. Datová sada objektů blob jímky využívá tento parametr v kanálu.receiver
. Tento parametr používají dvě webové aktivity v kanálu, které odesílají e-maily o úspěchu nebo neúspěchu příjemci.
Parameters = new Dictionary<string, ParameterSpecification>
{
{ "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
{ "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
{ "receiver", new ParameterSpecification { Type = ParameterType.String } }
},
Webová aktivita
Webová aktivita umožňuje volání libovolného koncového bodu REST. Další informace o aktivitě najdete v tématu Webová aktivita ve službě Azure Data Factory. Tento kanál používá webovou aktivitu k volání pracovního postupu e-mailu Logic Apps. Vytvoříte dvě webové aktivity: jednu, která volá CopySuccessEmail
pracovní postup, a jednu, která volá CopyFailWorkFlow
.
new WebActivity
{
Name = sendCopyEmailActivity,
Method = WebActivityMethod.POST,
Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/12345",
Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
DependsOn = new List<ActivityDependency>
{
new ActivityDependency
{
Activity = copyBlobActivity,
DependencyConditions = new List<String> { "Succeeded" }
}
}
}
Url
Do vlastnosti vložte koncové body adresy URL HTTP POST z pracovních postupů Logic Apps. Body
Ve vlastnosti předejte instanci EmailRequest
třídy. Obsahuje následující vlastnosti:
- Message. Předává hodnotu
@{activity('CopyBlobtoBlob').output.dataWritten
. Přistupuje k vlastnosti předchozí aktivity kopírování a předává hodnotudataWritten
. V případě neúspěchu předejte výstup chyby místo@{activity('CopyBlobtoBlob').error.message
. - Název datové továrny. Předá hodnotu
@{pipeline().DataFactory}
Této systémové proměnné, která umožňuje přístup k odpovídajícímu názvu datové továrny. Seznam systémových proměnných najdete v tématu Systémové proměnné. - Název kanálu Předává hodnotu
@{pipeline().Pipeline}
. Tato systémová proměnná umožňuje přístup k odpovídajícímu názvu kanálu. - Přijímač. Předává hodnotu
"@pipeline().parameters.receiver"
. Přistupuje k parametrům kanálu.
Tento kód vytvoří novou závislost aktivity, která závisí na předchozí aktivitě kopírování.
Vytvoření spuštění kanálu
Do metody, která aktivuje spuštění kanálu, přidejte následující kód Main
.
// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
{ "sourceBlobContainer", inputBlobPath },
{ "sinkBlobContainer", outputBlobPath },
{ "receiver", emailReceiver }
};
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
Hlavní třída
Main
Konečná metoda by měla vypadat takto.
// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
Factory df = CreateOrUpdateDataFactory(client);
client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
{ "sourceBlobContainer", inputBlobPath },
{ "sinkBlobContainer", outputBlobPath },
{ "receiver", emailReceiver }
};
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
Sestavte a spusťte program pro aktivaci spuštění kanálu!
Monitorování spuštění kanálu
Do metody
Main
přidejte následující kód:// Monitor the pipeline run Console.WriteLine("Checking pipeline run status..."); PipelineRun pipelineRun; while (true) { pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId); Console.WriteLine("Status: " + pipelineRun.Status); if (pipelineRun.Status == "InProgress") System.Threading.Thread.Sleep(15000); else break; }
Tento kód nepřetržitě kontroluje stav spuštění, dokud nedokončí kopírování dat.
Do metody, která načte podrobnosti o spuštění aktivity kopírování, přidejte následující kód
Main
, například velikost načtených a zapsaných dat:// Check the copy activity run details Console.WriteLine("Checking copy activity run details..."); List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun( resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)).ToList(); if (pipelineRun.Status == "Succeeded") { Console.WriteLine(activityRuns.First().Output); //SaveToJson(SafeJsonConvert.SerializeObject(activityRuns.First().Output, client.SerializationSettings), "ActivityRunResult.json", folderForJsons); } else Console.WriteLine(activityRuns.First().Error); Console.WriteLine("\nPress any key to exit..."); Console.ReadKey();
Spuštění kódu
Sestavte a spusťte aplikaci a potom ověřte spuštění kanálu.
Aplikace zobrazí průběh vytváření datové továrny, propojené služby, datových sad, kanálu a spuštění kanálu. Potom zkontroluje stav spuštění kanálu. Počkejte, dokud aktivita kopírování nezobrazí údaje o velikosti načtených/zapsaných dat. Potom pomocí nástrojů, jako je Průzkumník služby Azure Storage, zkontrolujte, že se objekt blob zkopíroval do výstupuBlobPath ze inputBlobPath, jak jste zadali v proměnných.
Výstup by měl vypadat přibližně jako v následující ukázce:
Creating data factory DFTutorialTest...
{
"location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=***;AccountKey=***"
}
}
Creating dataset SourceStorageDataset...
{
"type": "AzureBlob",
"typeProperties": {
"folderPath": {
"type": "Expression",
"value": "@pipeline().parameters.sourceBlobContainer"
},
"fileName": "input.txt"
},
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureStorageLinkedService"
}
}
Creating dataset SinkStorageDataset...
{
"type": "AzureBlob",
"typeProperties": {
"folderPath": {
"type": "Expression",
"value": "@pipeline().parameters.sinkBlobContainer"
}
},
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureStorageLinkedService"
}
}
Creating pipeline Adfv2TutorialBranchCopy...
{
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
}
},
"inputs": [
{
"type": "DatasetReference",
"referenceName": "SourceStorageDataset"
}
],
"outputs": [
{
"type": "DatasetReference",
"referenceName": "SinkStorageDataset"
}
],
"name": "CopyBlobtoBlob"
},
{
"type": "WebActivity",
"typeProperties": {
"method": "POST",
"url": "https://xxxx.eastus.logic.azure.com:443/workflows/... ",
"body": {
"message": "@{activity('CopyBlobtoBlob').output.dataWritten}",
"dataFactoryName": "@{pipeline().DataFactory}",
"pipelineName": "@{pipeline().Pipeline}",
"receiver": "@pipeline().parameters.receiver"
}
},
"name": "SendSuccessEmailActivity",
"dependsOn": [
{
"activity": "CopyBlobtoBlob",
"dependencyConditions": [
"Succeeded"
]
}
]
},
{
"type": "WebActivity",
"typeProperties": {
"method": "POST",
"url": "https://xxx.eastus.logic.azure.com:443/workflows/... ",
"body": {
"message": "@{activity('CopyBlobtoBlob').error.message}",
"dataFactoryName": "@{pipeline().DataFactory}",
"pipelineName": "@{pipeline().Pipeline}",
"receiver": "@pipeline().parameters.receiver"
}
},
"name": "SendFailEmailActivity",
"dependsOn": [
{
"activity": "CopyBlobtoBlob",
"dependencyConditions": [
"Failed"
]
}
]
}
],
"parameters": {
"sourceBlobContainer": {
"type": "String"
},
"sinkBlobContainer": {
"type": "String"
},
"receiver": {
"type": "String"
}
}
}
}
Creating pipeline run...
Pipeline run ID: 00000000-0000-0000-0000-0000000000000
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
"dataRead": 20,
"dataWritten": 20,
"copyDuration": 4,
"throughput": 0.01,
"errors": [],
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
}
{}
Press any key to exit...
Související obsah
V tomto kurzu jste provedli následující úlohy:
- Vytvoření datové továrny
- Vytvoření propojené služby Azure Storage
- Vytvoření datové sady Azure Blob
- Vytvoření kanálu, který obsahuje aktivitu kopírování a aktivitu webu
- Odeslání výstupů aktivit následným aktivitám
- Použití předávání parametrů a systémových proměnných
- Spuštění kanálu
- Monitorování spuštění aktivit a kanálu
Teď můžete pokračovat v části Koncepty, kde najdete další informace o službě Azure Data Factory.