Branchning och kedjesammansättning av aktiviteter i en Data Factory-pipeline
GÄLLER FÖR: Azure Data Factory Azure Synapse Analytics
Dricks
Prova Data Factory i Microsoft Fabric, en allt-i-ett-analyslösning för företag. Microsoft Fabric omfattar allt från dataflytt till datavetenskap, realtidsanalys, business intelligence och rapportering. Lär dig hur du startar en ny utvärderingsversion kostnadsfritt!
I den här självstudien skapar du en Data Factory-pipeline som visar vissa kontrollflödesfunktioner. Den här pipelinen kopierar från en container i Azure Blob Storage till en annan container i samma lagringskonto. Om kopieringsaktiviteten lyckas skickar pipelinen information om den lyckade kopieringsåtgärden i ett e-postmeddelande. Den informationen kan omfatta mängden data som skrivits. Om kopieringsaktiviteten misslyckas skickar den information om kopieringsfelet, till exempel felmeddelandet, i ett e-postmeddelande. I självstudiekursen visas olika exempel på hur du skickar parametrar.
Den här bilden ger en översikt över scenariot:
Den här självstudien visar hur du utför följande uppgifter:
- Skapa en datafabrik
- Skapa en länkad Azure Storage-tjänst
- Skapa en Azure Blob-datauppsättning
- Skapa en pipeline som innehåller en kopieringsaktivitet och en webbaktivitet
- Skicka utdata för aktiviteter till efterföljande aktiviteter
- Använda parameteröverföring och systemvariabler
- Starta en pipelinekörning
- Övervaka pipelinen och aktivitetskörningar
I den här självstudiekursen används .NET SDK. Du kan använda andra mekanismer för att interagera med Azure Data Factory. Snabbstarter för Data Factory finns i snabbstarter på 5 minuter.
Om du inte har någon Azure-prenumeration skapar du ett kostnadsfritt konto innan du börjar.
Förutsättningar
- Azure Storage-konto. Du använder bloblagring som källdatalager. Om du inte har något Azure-lagringskonto kan du läsa Skapa ett lagringskonto.
- Azure Storage Explorer. Information om hur du installerar det här verktyget finns i Azure Storage Explorer.
- Azure SQL Database. Du använder databasen som mottagare för datalagringen. Om du inte har någon databas i Azure SQL Database läser du Skapa en databas i Azure SQL Database.
- Visual Studio. Den här artikeln använder Visual Studio 2019.
- Azure .NET SDK. Ladda ned och installera Azure .NET SDK.
En lista över Azure-regioner där Data Factory för närvarande är tillgängligt finns i Produkter tillgängliga per region. Datalager och beräkningar kan finnas i andra regioner. Bland butikerna finns Azure Storage och Azure SQL Database. Beräkningen omfattar HDInsight, som Data Factory använder.
Skapa ett program enligt beskrivningen i Skapa ett Microsoft Entra-program. Tilldela programmet rollen Deltagare genom att följa anvisningarna i samma artikel. Du behöver flera värden för senare delar av den här självstudien, till exempel program-ID (klient)-ID och katalog-ID (klientorganisation).
Skapa en blobtabell
Öppna en textredigerare. Kopiera följande text och spara den lokalt som input.txt.
Ethel|Berg Tamika|Walsh
Öppna Azure Storage Explorer. Expandera ditt lagringskonto. Högerklicka på Blobcontainrar och välj Skapa blobcontainer.
Ge den nya containern namnet adfv2branch och välj Ladda upp för att lägga till filen input.txt i containern.
Skapa Visual Studio-projekt
Skapa ett C# .NET-konsolprogram:
- Starta Visual Studio och välj Skapa ett nytt projekt.
- I Skapa ett nytt projekt väljer du Konsolapp (.NET Framework) för C# och väljer Nästa.
- Ge projektet namnet ADFv2BranchTutorial.
- Välj .NET version 4.5.2 eller senare och välj sedan Skapa.
Installera NuGet-paket
Välj Verktyg>NuGet Package Manager Package Manager-konsolen.>
Kör följande kommandon i Package Manager Console för att installera paket. Mer information finns i Nuget-paketet Microsoft.Azure.Management.DataFactory.
Install-Package Microsoft.Azure.Management.DataFactory Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
Skapa en datafabriksklient
Öppna Program.cs och lägg till följande instruktioner:
using System; using System.Collections.Generic; using System.Linq; using Microsoft.Rest; using Microsoft.Azure.Management.ResourceManager; using Microsoft.Azure.Management.DataFactory; using Microsoft.Azure.Management.DataFactory.Models; using Microsoft.IdentityModel.Clients.ActiveDirectory;
Lägg till dessa statiska variabler i
Program
klassen. Ersätt platshållarna med dina egna värden.// Set variables static string tenantID = "<tenant ID>"; static string applicationId = "<application ID>"; static string authenticationKey = "<Authentication key for your application>"; static string subscriptionId = "<Azure subscription ID>"; static string resourceGroup = "<Azure resource group name>"; static string region = "East US"; static string dataFactoryName = "<Data factory name>"; // Specify the source Azure Blob information static string storageAccount = "<Azure Storage account name>"; static string storageKey = "<Azure Storage account key>"; // confirm that you have the input.txt file placed in th input folder of the adfv2branch container. static string inputBlobPath = "adfv2branch/input"; static string inputBlobName = "input.txt"; static string outputBlobPath = "adfv2branch/output"; static string emailReceiver = "<specify email address of the receiver>"; static string storageLinkedServiceName = "AzureStorageLinkedService"; static string blobSourceDatasetName = "SourceStorageDataset"; static string blobSinkDatasetName = "SinkStorageDataset"; static string pipelineName = "Adfv2TutorialBranchCopy"; static string copyBlobActivity = "CopyBlobtoBlob"; static string sendFailEmailActivity = "SendFailEmailActivity"; static string sendSuccessEmailActivity = "SendSuccessEmailActivity";
Lägg till följande kod i metoden
Main
. Den här koden skapar en instans avDataFactoryManagementClient
klassen. Sedan använder du det här objektet för att skapa datafabrik, länkad tjänst, datauppsättningar och pipeline. Du kan också använda det här objektet för att övervaka pipelinekörningsinformationen.// Authenticate and create a data factory management client var context = new AuthenticationContext("https://login.windows.net/" + tenantID); ClientCredential cc = new ClientCredential(applicationId, authenticationKey); AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result; ServiceClientCredentials cred = new TokenCredentials(result.AccessToken); var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
Skapa en datafabrik
Lägg till en
CreateOrUpdateDataFactory
metod i din Program.cs-fil :static Factory CreateOrUpdateDataFactory(DataFactoryManagementClient client) { Console.WriteLine("Creating data factory " + dataFactoryName + "..."); Factory resource = new Factory { Location = region }; Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings)); Factory response; { response = client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, resource); } while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation") { System.Threading.Thread.Sleep(1000); } return response; }
Lägg till följande rad i metoden
Main
som skapar en datafabrik:Factory df = CreateOrUpdateDataFactory(client);
Skapa en länkad Azure Storage-tjänst
Lägg till en
StorageLinkedServiceDefinition
metod i din Program.cs-fil :static LinkedServiceResource StorageLinkedServiceDefinition(DataFactoryManagementClient client) { Console.WriteLine("Creating linked service " + storageLinkedServiceName + "..."); AzureStorageLinkedService storageLinkedService = new AzureStorageLinkedService { ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey) }; Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings)); LinkedServiceResource linkedService = new LinkedServiceResource(storageLinkedService, name:storageLinkedServiceName); return linkedService; }
Lägg till följande rad i metoden
Main
som skapar en länkad Azure Storage-tjänst:client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
Mer information om egenskaper och information som stöds finns i Länkade tjänstegenskaper.
Skapa datauppsättningar
I det här avsnittet skapar du två datauppsättningar, en för källan och en för mottagaren.
Skapa en datauppsättning för en Azure-källblob
Lägg till en metod som skapar en Azure Blob-datauppsättning. Mer information om egenskaper och information som stöds finns i Egenskaper för Azure Blob-datauppsättning.
Lägg till en SourceBlobDatasetDefinition
metod i din Program.cs-fil :
static DatasetResource SourceBlobDatasetDefinition(DataFactoryManagementClient client)
{
Console.WriteLine("Creating dataset " + blobSourceDatasetName + "...");
AzureBlobDataset blobDataset = new AzureBlobDataset
{
FolderPath = new Expression { Value = "@pipeline().parameters.sourceBlobContainer" },
FileName = inputBlobName,
LinkedServiceName = new LinkedServiceReference
{
ReferenceName = storageLinkedServiceName
}
};
Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
DatasetResource dataset = new DatasetResource(blobDataset, name:blobSourceDatasetName);
return dataset;
}
Du definierar en datauppsättning som representerar källdata i Azure Blob. Den här blobdatauppsättningen refererar till den länkade Azure Storage-tjänst som stöds i föregående steg. Blobdatauppsättningen beskriver platsen för bloben som ska kopieras från: FolderPath och FileName.
Observera användningen av parametrar för FolderPath. sourceBlobContainer
är namnet på parametern och uttrycket ersätts med de värden som skickas i pipelinekörningen. Syntaxen för att definiera parametrar är @pipeline().parameters.<parameterName>
Skapa en datauppsättning för en azure-mottagarblob
Lägg till en
SourceBlobDatasetDefinition
metod i din Program.cs-fil :static DatasetResource SinkBlobDatasetDefinition(DataFactoryManagementClient client) { Console.WriteLine("Creating dataset " + blobSinkDatasetName + "..."); AzureBlobDataset blobDataset = new AzureBlobDataset { FolderPath = new Expression { Value = "@pipeline().parameters.sinkBlobContainer" }, LinkedServiceName = new LinkedServiceReference { ReferenceName = storageLinkedServiceName } }; Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings)); DatasetResource dataset = new DatasetResource(blobDataset, name: blobSinkDatasetName); return dataset; }
Lägg till följande kod i metoden
Main
som skapar både Azure Blob-käll- och mottagardatauppsättningar.client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client)); client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
Skapa en C#-klass: EmailRequest
I ditt C#-projekt skapar du en klass med namnet EmailRequest
. Den här klassen definierar vilka egenskaper pipelinen skickar i brödtextbegäran när du skickar ett e-postmeddelande. I den här självstudiekursen skickar pipelinen fyra egenskaper från pipelinen till e-postmeddelandet:
- Meddelande. Brödtext för e-postmeddelandet. För en lyckad kopia innehåller den här egenskapen mängden data som skrivits. För en misslyckad kopia innehåller den här egenskapen information om felet.
- Namn på datafabrik. Namnet på datafabriken.
- Pipelinenamn. Namnet på pipeline.
- Mottagare. Parameter som passerar igenom. Den här egenskapen anger mottagaren av e-postmeddelandet.
class EmailRequest
{
[Newtonsoft.Json.JsonProperty(PropertyName = "message")]
public string message;
[Newtonsoft.Json.JsonProperty(PropertyName = "dataFactoryName")]
public string dataFactoryName;
[Newtonsoft.Json.JsonProperty(PropertyName = "pipelineName")]
public string pipelineName;
[Newtonsoft.Json.JsonProperty(PropertyName = "receiver")]
public string receiver;
public EmailRequest(string input, string df, string pipeline, string receiverName)
{
message = input;
dataFactoryName = df;
pipelineName = pipeline;
receiver = receiverName;
}
}
Skapa slutpunkter för e-postarbetsflödet
Om du vill utlösa sändning av ett e-postmeddelande använder du Azure Logic Apps för att definiera arbetsflödet. Mer information finns i Skapa ett exempel på arbetsflöde för förbrukningslogikapp.
Lyckat e-postarbetsflöde
I Azure Portal skapar du ett logikapparbetsflöde med namnet CopySuccessEmail
. Lägg till utlösaren Förfrågning med namnet När en HTTP-begäran tas emot. I utlösaren Begäran fyller du i JSON-schemarutan Begärandetext med följande JSON:
{
"properties": {
"dataFactoryName": {
"type": "string"
},
"message": {
"type": "string"
},
"pipelineName": {
"type": "string"
},
"receiver": {
"type": "string"
}
},
"type": "object"
}
Arbetsflödet ser ut ungefär som i följande exempel:
Det här JSON-innehållet överensstämmer med den EmailRequest
klass som du skapade i föregående avsnitt.
Lägg till office 365 Outlook-åtgärden med namnet Skicka ett e-postmeddelande. För den här åtgärden anpassar du hur du vill formatera e-postmeddelandet med hjälp av egenskaperna som skickas i JSON-schemat för begärandetext. Här är ett exempel:
När du har sparat arbetsflödet kopierar du och sparar VÄRDET FÖR HTTP POST-URL från utlösaren.
Arbetsflöde för e-postmeddelande om misslyckad kopiering
Klona logikappens CopySuccessEmail
arbetsflöde till ett nytt arbetsflöde med namnet CopyFailEmail
. I begärandeutlösaren är JSON-schemat för begärandetext detsamma. Ändra formatet för ditt e-postmeddelande som Subject
för att skapa ett e-postmeddelande om att kopieringen misslyckats. Här är ett exempel:
När du har sparat arbetsflödet kopierar du och sparar VÄRDET FÖR HTTP POST-URL från utlösaren.
Du bör nu ha två arbetsflödes-URL:er, som följande exempel:
//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000
//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000
Skapa en pipeline
Gå tillbaka till projektet i Visual Studio. Nu ska vi lägga till koden som skapar en pipeline med en kopieringsaktivitet och DependsOn
-egenskap. I den här självstudien innehåller pipelinen en aktivitet, en kopieringsaktivitet, som tar in blobdatauppsättningen som källa och en annan Blob-datauppsättning som mottagare. Om kopieringsaktiviteten lyckas eller misslyckas anropas olika e-postuppgifter.
I denna pipeline kan du använda följande funktioner:
- Parametrar
- Webbaktivitet
- Aktivitetsberoende
- Använda utdata från en aktivitet som indata till en annan aktivitet
Lägg till den här metoden i projektet. Följande avsnitt innehåller mer information.
static PipelineResource PipelineDefinition(DataFactoryManagementClient client) { Console.WriteLine("Creating pipeline " + pipelineName + "..."); PipelineResource resource = new PipelineResource { Parameters = new Dictionary<string, ParameterSpecification> { { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } }, { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } }, { "receiver", new ParameterSpecification { Type = ParameterType.String } } }, Activities = new List<Activity> { new CopyActivity { Name = copyBlobActivity, Inputs = new List<DatasetReference> { new DatasetReference { ReferenceName = blobSourceDatasetName } }, Outputs = new List<DatasetReference> { new DatasetReference { ReferenceName = blobSinkDatasetName } }, Source = new BlobSource { }, Sink = new BlobSink { } }, new WebActivity { Name = sendSuccessEmailActivity, Method = WebActivityMethod.POST, Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/00000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000000", Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"), DependsOn = new List<ActivityDependency> { new ActivityDependency { Activity = copyBlobActivity, DependencyConditions = new List<String> { "Succeeded" } } } }, new WebActivity { Name = sendFailEmailActivity, Method =WebActivityMethod.POST, Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000", Body = new EmailRequest("@{activity('CopyBlobtoBlob').error.message}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"), DependsOn = new List<ActivityDependency> { new ActivityDependency { Activity = copyBlobActivity, DependencyConditions = new List<String> { "Failed" } } } } } }; Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings)); return resource; }
Lägg till följande rad i metoden
Main
som skapar pipelinen:client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
Parametrar
Det första avsnittet i vår pipelinekod definierar parametrar.
sourceBlobContainer
. Källblobdatauppsättningen använder den här parametern i pipelinen.sinkBlobContainer
. Datauppsättningen för mottagarblob använder den här parametern i pipelinen.receiver
. De två webbaktiviteterna i pipelinen som skickar lyckade eller misslyckade e-postmeddelanden till mottagaren använder den här parametern.
Parameters = new Dictionary<string, ParameterSpecification>
{
{ "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
{ "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
{ "receiver", new ParameterSpecification { Type = ParameterType.String } }
},
Webbaktivitet
Webbaktiviteten tillåter ett anrop till valfri REST-slutpunkt. Mer information om aktiviteten finns i Webbaktivitet i Azure Data Factory. Den här pipelinen använder en webbaktivitet för att anropa Logic Apps e-postarbetsflöde. Du skapar två webbaktiviteter: en som anropar arbetsflödet CopySuccessEmail
och en som anropar CopyFailWorkFlow
.
new WebActivity
{
Name = sendCopyEmailActivity,
Method = WebActivityMethod.POST,
Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/12345",
Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
DependsOn = new List<ActivityDependency>
{
new ActivityDependency
{
Activity = copyBlobActivity,
DependencyConditions = new List<String> { "Succeeded" }
}
}
}
I egenskapen Url
klistrar du in HTTP POST-URL-slutpunkterna från dina Logic Apps-arbetsflöden. I egenskapen Body
skickar du en instans av EmailRequest
klassen. E-postbegäran innehåller följande egenskaper:
- Meddelande. Skickar värdet för
@{activity('CopyBlobtoBlob').output.dataWritten
. Öppnar en egenskap för den tidigare kopieringsaktiviteten och skickar värdetdataWritten
för . Vid ett fel skickas felutdata i stället för@{activity('CopyBlobtoBlob').error.message
. - Namn på datafabrik. Skickar värdet för
@{pipeline().DataFactory}
Den här systemvariabeln gör att du kan komma åt motsvarande datafabriksnamn. En lista över systemvariabler finns i Systemvariabler. - Pipelinenamn. Skickar värdet för
@{pipeline().Pipeline}
. Med den här systemvariabeln kan du komma åt motsvarande pipelinenamn. - Mottagare. Skickar värdet för
"@pipeline().parameters.receiver"
. Kommer åt pipelineparametrarna.
Den här koden skapar ett nytt aktivitetsberoende som är beroende av den tidigare kopieringsaktiviteten.
Skapa en pipelinekörning
Lägg till följande kod i metoden Main
som utlöser en pipelinekörning.
// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
{ "sourceBlobContainer", inputBlobPath },
{ "sinkBlobContainer", outputBlobPath },
{ "receiver", emailReceiver }
};
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
Main-klass
Den slutliga Main
metoden bör se ut så här.
// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
Factory df = CreateOrUpdateDataFactory(client);
client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
{ "sourceBlobContainer", inputBlobPath },
{ "sinkBlobContainer", outputBlobPath },
{ "receiver", emailReceiver }
};
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
Skapa och kör programmet för att utlösa en pipelinekörning!
Övervaka en pipelinekörning
Lägg till följande kod i metoden
Main
:// Monitor the pipeline run Console.WriteLine("Checking pipeline run status..."); PipelineRun pipelineRun; while (true) { pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId); Console.WriteLine("Status: " + pipelineRun.Status); if (pipelineRun.Status == "InProgress") System.Threading.Thread.Sleep(15000); else break; }
Den här koden kontrollerar kontinuerligt statusen för körningen tills den har kopierat data.
Lägg till följande kod i metoden
Main
som hämtar information om kopieringsaktivitetskörning, till exempel storleken på data som lästs/skrivits:// Check the copy activity run details Console.WriteLine("Checking copy activity run details..."); List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun( resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)).ToList(); if (pipelineRun.Status == "Succeeded") { Console.WriteLine(activityRuns.First().Output); //SaveToJson(SafeJsonConvert.SerializeObject(activityRuns.First().Output, client.SerializationSettings), "ActivityRunResult.json", folderForJsons); } else Console.WriteLine(activityRuns.First().Error); Console.WriteLine("\nPress any key to exit..."); Console.ReadKey();
Kör koden
Skapa och starta programmet och kontrollera sedan pipelinekörningen.
Programmet visar förloppet för att skapa datafabrik, länkad tjänst, datauppsättningar, pipeline och pipelinekörning. Sedan kontrolleras status för pipelinekörningen. Vänta tills du ser information om körningen av kopieringsaktiviteten med storlek för lästa/skrivna data. Använd sedan verktyg som Azure Storage Explorer för att kontrollera att bloben kopierades till outputBlobPath från inputBlobPath som du angav i variabler.
Dina utdata bör likna följande exempel:
Creating data factory DFTutorialTest...
{
"location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=***;AccountKey=***"
}
}
Creating dataset SourceStorageDataset...
{
"type": "AzureBlob",
"typeProperties": {
"folderPath": {
"type": "Expression",
"value": "@pipeline().parameters.sourceBlobContainer"
},
"fileName": "input.txt"
},
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureStorageLinkedService"
}
}
Creating dataset SinkStorageDataset...
{
"type": "AzureBlob",
"typeProperties": {
"folderPath": {
"type": "Expression",
"value": "@pipeline().parameters.sinkBlobContainer"
}
},
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureStorageLinkedService"
}
}
Creating pipeline Adfv2TutorialBranchCopy...
{
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
}
},
"inputs": [
{
"type": "DatasetReference",
"referenceName": "SourceStorageDataset"
}
],
"outputs": [
{
"type": "DatasetReference",
"referenceName": "SinkStorageDataset"
}
],
"name": "CopyBlobtoBlob"
},
{
"type": "WebActivity",
"typeProperties": {
"method": "POST",
"url": "https://xxxx.eastus.logic.azure.com:443/workflows/... ",
"body": {
"message": "@{activity('CopyBlobtoBlob').output.dataWritten}",
"dataFactoryName": "@{pipeline().DataFactory}",
"pipelineName": "@{pipeline().Pipeline}",
"receiver": "@pipeline().parameters.receiver"
}
},
"name": "SendSuccessEmailActivity",
"dependsOn": [
{
"activity": "CopyBlobtoBlob",
"dependencyConditions": [
"Succeeded"
]
}
]
},
{
"type": "WebActivity",
"typeProperties": {
"method": "POST",
"url": "https://xxx.eastus.logic.azure.com:443/workflows/... ",
"body": {
"message": "@{activity('CopyBlobtoBlob').error.message}",
"dataFactoryName": "@{pipeline().DataFactory}",
"pipelineName": "@{pipeline().Pipeline}",
"receiver": "@pipeline().parameters.receiver"
}
},
"name": "SendFailEmailActivity",
"dependsOn": [
{
"activity": "CopyBlobtoBlob",
"dependencyConditions": [
"Failed"
]
}
]
}
],
"parameters": {
"sourceBlobContainer": {
"type": "String"
},
"sinkBlobContainer": {
"type": "String"
},
"receiver": {
"type": "String"
}
}
}
}
Creating pipeline run...
Pipeline run ID: 00000000-0000-0000-0000-0000000000000
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
"dataRead": 20,
"dataWritten": 20,
"copyDuration": 4,
"throughput": 0.01,
"errors": [],
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
}
{}
Press any key to exit...
Relaterat innehåll
Du har gjort följande uppgifter i den här självstudien:
- Skapa en datafabrik
- Skapa en länkad Azure Storage-tjänst
- Skapa en Azure Blob-datauppsättning
- Skapa en pipeline som innehåller en kopieringsaktivitet och en webbaktivitet
- Skicka utdata för aktiviteter till efterföljande aktiviteter
- Använda parameteröverföring och systemvariabler
- Starta en pipelinekörning
- Övervaka pipelinen och aktivitetskörningar
Du kan nu fortsätta till avsnittet Begrepp för mer information om Azure Data Factory.