Dela via


Branchning och kedjesammansättning av aktiviteter i en Data Factory-pipeline

GÄLLER FÖR: Azure Data Factory Azure Synapse Analytics

Dricks

Prova Data Factory i Microsoft Fabric, en allt-i-ett-analyslösning för företag. Microsoft Fabric omfattar allt från dataflytt till datavetenskap, realtidsanalys, business intelligence och rapportering. Lär dig hur du startar en ny utvärderingsversion kostnadsfritt!

I den här självstudien skapar du en Data Factory-pipeline som visar vissa kontrollflödesfunktioner. Den här pipelinen kopierar från en container i Azure Blob Storage till en annan container i samma lagringskonto. Om kopieringsaktiviteten lyckas skickar pipelinen information om den lyckade kopieringsåtgärden i ett e-postmeddelande. Den informationen kan omfatta mängden data som skrivits. Om kopieringsaktiviteten misslyckas skickar den information om kopieringsfelet, till exempel felmeddelandet, i ett e-postmeddelande. I självstudiekursen visas olika exempel på hur du skickar parametrar.

Den här bilden ger en översikt över scenariot:

Diagrammet visar Azure Blob Storage, som är målet för en kopia, som vid lyckat resultat skickar ett e-postmeddelande med information eller vid fel skickar ett e-postmeddelande med felinformation.

Den här självstudien visar hur du utför följande uppgifter:

  • Skapa en datafabrik
  • Skapa en länkad Azure Storage-tjänst
  • Skapa en Azure Blob-datauppsättning
  • Skapa en pipeline som innehåller en kopieringsaktivitet och en webbaktivitet
  • Skicka utdata för aktiviteter till efterföljande aktiviteter
  • Använda parameteröverföring och systemvariabler
  • Starta en pipelinekörning
  • Övervaka pipelinen och aktivitetskörningar

I den här självstudiekursen används .NET SDK. Du kan använda andra mekanismer för att interagera med Azure Data Factory. Snabbstarter för Data Factory finns i snabbstarter på 5 minuter.

Om du inte har någon Azure-prenumeration skapar du ett kostnadsfritt konto innan du börjar.

Förutsättningar

  • Azure Storage-konto. Du använder bloblagring som källdatalager. Om du inte har något Azure-lagringskonto kan du läsa Skapa ett lagringskonto.
  • Azure Storage Explorer. Information om hur du installerar det här verktyget finns i Azure Storage Explorer.
  • Azure SQL Database. Du använder databasen som mottagare för datalagringen. Om du inte har någon databas i Azure SQL Database läser du Skapa en databas i Azure SQL Database.
  • Visual Studio. Den här artikeln använder Visual Studio 2019.
  • Azure .NET SDK. Ladda ned och installera Azure .NET SDK.

En lista över Azure-regioner där Data Factory för närvarande är tillgängligt finns i Produkter tillgängliga per region. Datalager och beräkningar kan finnas i andra regioner. Bland butikerna finns Azure Storage och Azure SQL Database. Beräkningen omfattar HDInsight, som Data Factory använder.

Skapa ett program enligt beskrivningen i Skapa ett Microsoft Entra-program. Tilldela programmet rollen Deltagare genom att följa anvisningarna i samma artikel. Du behöver flera värden för senare delar av den här självstudien, till exempel program-ID (klient)-ID och katalog-ID (klientorganisation).

Skapa en blobtabell

  1. Öppna en textredigerare. Kopiera följande text och spara den lokalt som input.txt.

    Ethel|Berg
    Tamika|Walsh
    
  2. Öppna Azure Storage Explorer. Expandera ditt lagringskonto. Högerklicka på Blobcontainrar och välj Skapa blobcontainer.

  3. Ge den nya containern namnet adfv2branch och välj Ladda upp för att lägga till filen input.txt i containern.

Skapa Visual Studio-projekt

Skapa ett C# .NET-konsolprogram:

  1. Starta Visual Studio och välj Skapa ett nytt projekt.
  2. I Skapa ett nytt projekt väljer du Konsolapp (.NET Framework) för C# och väljer Nästa.
  3. Ge projektet namnet ADFv2BranchTutorial.
  4. Välj .NET version 4.5.2 eller senare och välj sedan Skapa.

Installera NuGet-paket

  1. Välj Verktyg>NuGet Package Manager Package Manager-konsolen.>

  2. Kör följande kommandon i Package Manager Console för att installera paket. Mer information finns i Nuget-paketet Microsoft.Azure.Management.DataFactory.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

Skapa en datafabriksklient

  1. Öppna Program.cs och lägg till följande instruktioner:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. Lägg till dessa statiska variabler i Program klassen. Ersätt platshållarna med dina egna värden.

    // Set variables
    static string tenantID = "<tenant ID>";
    static string applicationId = "<application ID>";
    static string authenticationKey = "<Authentication key for your application>";
    static string subscriptionId = "<Azure subscription ID>";
    static string resourceGroup = "<Azure resource group name>";
    
    static string region = "East US";
    static string dataFactoryName = "<Data factory name>";
    
    // Specify the source Azure Blob information
    static string storageAccount = "<Azure Storage account name>";
    static string storageKey = "<Azure Storage account key>";
    // confirm that you have the input.txt file placed in th input folder of the adfv2branch container.
    static string inputBlobPath = "adfv2branch/input";
    static string inputBlobName = "input.txt";
    static string outputBlobPath = "adfv2branch/output";
    static string emailReceiver = "<specify email address of the receiver>";
    
    static string storageLinkedServiceName = "AzureStorageLinkedService";
    static string blobSourceDatasetName = "SourceStorageDataset";
    static string blobSinkDatasetName = "SinkStorageDataset";
    static string pipelineName = "Adfv2TutorialBranchCopy";
    
    static string copyBlobActivity = "CopyBlobtoBlob";
    static string sendFailEmailActivity = "SendFailEmailActivity";
    static string sendSuccessEmailActivity = "SendSuccessEmailActivity";
    
  3. Lägg till följande kod i metoden Main. Den här koden skapar en instans av DataFactoryManagementClient klassen. Sedan använder du det här objektet för att skapa datafabrik, länkad tjänst, datauppsättningar och pipeline. Du kan också använda det här objektet för att övervaka pipelinekörningsinformationen.

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

Skapa en datafabrik

  1. Lägg till en CreateOrUpdateDataFactory metod i din Program.cs-fil :

    static Factory CreateOrUpdateDataFactory(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating data factory " + dataFactoryName + "...");
        Factory resource = new Factory
        {
            Location = region
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
    
        Factory response;
        {
            response = client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, resource);
        }
    
        while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")
        {
            System.Threading.Thread.Sleep(1000);
        }
        return response;
    }
    
  2. Lägg till följande rad i metoden Main som skapar en datafabrik:

    Factory df = CreateOrUpdateDataFactory(client);
    

Skapa en länkad Azure Storage-tjänst

  1. Lägg till en StorageLinkedServiceDefinition metod i din Program.cs-fil :

    static LinkedServiceResource StorageLinkedServiceDefinition(DataFactoryManagementClient client)
    {
       Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
       AzureStorageLinkedService storageLinkedService = new AzureStorageLinkedService
       {
           ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey)
       };
       Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
       LinkedServiceResource linkedService = new LinkedServiceResource(storageLinkedService, name:storageLinkedServiceName);
       return linkedService;
    }
    
  2. Lägg till följande rad i metoden Main som skapar en länkad Azure Storage-tjänst:

    client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
    

Mer information om egenskaper och information som stöds finns i Länkade tjänstegenskaper.

Skapa datauppsättningar

I det här avsnittet skapar du två datauppsättningar, en för källan och en för mottagaren.

Skapa en datauppsättning för en Azure-källblob

Lägg till en metod som skapar en Azure Blob-datauppsättning. Mer information om egenskaper och information som stöds finns i Egenskaper för Azure Blob-datauppsättning.

Lägg till en SourceBlobDatasetDefinition metod i din Program.cs-fil :

static DatasetResource SourceBlobDatasetDefinition(DataFactoryManagementClient client)
{
    Console.WriteLine("Creating dataset " + blobSourceDatasetName + "...");
    AzureBlobDataset blobDataset = new AzureBlobDataset
    {
        FolderPath = new Expression { Value = "@pipeline().parameters.sourceBlobContainer" },
        FileName = inputBlobName,
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = storageLinkedServiceName
        }
    };
    Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
    DatasetResource dataset = new DatasetResource(blobDataset, name:blobSourceDatasetName);
    return dataset;
}

Du definierar en datauppsättning som representerar källdata i Azure Blob. Den här blobdatauppsättningen refererar till den länkade Azure Storage-tjänst som stöds i föregående steg. Blobdatauppsättningen beskriver platsen för bloben som ska kopieras från: FolderPath och FileName.

Observera användningen av parametrar för FolderPath. sourceBlobContainer är namnet på parametern och uttrycket ersätts med de värden som skickas i pipelinekörningen. Syntaxen för att definiera parametrar är @pipeline().parameters.<parameterName>

Skapa en datauppsättning för en azure-mottagarblob

  1. Lägg till en SourceBlobDatasetDefinition metod i din Program.cs-fil :

    static DatasetResource SinkBlobDatasetDefinition(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating dataset " + blobSinkDatasetName + "...");
        AzureBlobDataset blobDataset = new AzureBlobDataset
        {
            FolderPath = new Expression { Value = "@pipeline().parameters.sinkBlobContainer" },
            LinkedServiceName = new LinkedServiceReference
            {
                ReferenceName = storageLinkedServiceName
            }
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
        DatasetResource dataset = new DatasetResource(blobDataset, name: blobSinkDatasetName);
        return dataset;
    }
    
  2. Lägg till följande kod i metoden Main som skapar både Azure Blob-käll- och mottagardatauppsättningar.

    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
    
    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
    

Skapa en C#-klass: EmailRequest

I ditt C#-projekt skapar du en klass med namnet EmailRequest. Den här klassen definierar vilka egenskaper pipelinen skickar i brödtextbegäran när du skickar ett e-postmeddelande. I den här självstudiekursen skickar pipelinen fyra egenskaper från pipelinen till e-postmeddelandet:

  • Meddelande. Brödtext för e-postmeddelandet. För en lyckad kopia innehåller den här egenskapen mängden data som skrivits. För en misslyckad kopia innehåller den här egenskapen information om felet.
  • Namn på datafabrik. Namnet på datafabriken.
  • Pipelinenamn. Namnet på pipeline.
  • Mottagare. Parameter som passerar igenom. Den här egenskapen anger mottagaren av e-postmeddelandet.
    class EmailRequest
    {
        [Newtonsoft.Json.JsonProperty(PropertyName = "message")]
        public string message;

        [Newtonsoft.Json.JsonProperty(PropertyName = "dataFactoryName")]
        public string dataFactoryName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "pipelineName")]
        public string pipelineName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "receiver")]
        public string receiver;

        public EmailRequest(string input, string df, string pipeline, string receiverName)
        {
            message = input;
            dataFactoryName = df;
            pipelineName = pipeline;
            receiver = receiverName;
        }
    }

Skapa slutpunkter för e-postarbetsflödet

Om du vill utlösa sändning av ett e-postmeddelande använder du Azure Logic Apps för att definiera arbetsflödet. Mer information finns i Skapa ett exempel på arbetsflöde för förbrukningslogikapp.

Lyckat e-postarbetsflöde

I Azure Portal skapar du ett logikapparbetsflöde med namnet CopySuccessEmail. Lägg till utlösaren Förfrågning med namnet När en HTTP-begäran tas emot. I utlösaren Begäran fyller du i JSON-schemarutan Begärandetext med följande JSON:

{
    "properties": {
        "dataFactoryName": {
            "type": "string"
        },
        "message": {
            "type": "string"
        },
        "pipelineName": {
            "type": "string"
        },
        "receiver": {
            "type": "string"
        }
    },
    "type": "object"
}

Arbetsflödet ser ut ungefär som i följande exempel:

Lyckat e-postarbetsflöde

Det här JSON-innehållet överensstämmer med den EmailRequest klass som du skapade i föregående avsnitt.

Lägg till office 365 Outlook-åtgärden med namnet Skicka ett e-postmeddelande. För den här åtgärden anpassar du hur du vill formatera e-postmeddelandet med hjälp av egenskaperna som skickas i JSON-schemat för begärandetext. Här är ett exempel:

Arbetsflödesdesigner med åtgärden Skicka ett e-postmeddelande.

När du har sparat arbetsflödet kopierar du och sparar VÄRDET FÖR HTTP POST-URL från utlösaren.

Arbetsflöde för e-postmeddelande om misslyckad kopiering

Klona logikappens CopySuccessEmail arbetsflöde till ett nytt arbetsflöde med namnet CopyFailEmail. I begärandeutlösaren är JSON-schemat för begärandetext detsamma. Ändra formatet för ditt e-postmeddelande som Subject för att skapa ett e-postmeddelande om att kopieringen misslyckats. Här är ett exempel:

Arbetsflödesdesignern och arbetsflödet för e-post som inte fungerar.

När du har sparat arbetsflödet kopierar du och sparar VÄRDET FÖR HTTP POST-URL från utlösaren.

Du bör nu ha två arbetsflödes-URL:er, som följande exempel:

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

Skapa en pipeline

Gå tillbaka till projektet i Visual Studio. Nu ska vi lägga till koden som skapar en pipeline med en kopieringsaktivitet och DependsOn -egenskap. I den här självstudien innehåller pipelinen en aktivitet, en kopieringsaktivitet, som tar in blobdatauppsättningen som källa och en annan Blob-datauppsättning som mottagare. Om kopieringsaktiviteten lyckas eller misslyckas anropas olika e-postuppgifter.

I denna pipeline kan du använda följande funktioner:

  • Parametrar
  • Webbaktivitet
  • Aktivitetsberoende
  • Använda utdata från en aktivitet som indata till en annan aktivitet
  1. Lägg till den här metoden i projektet. Följande avsnitt innehåller mer information.

    static PipelineResource PipelineDefinition(DataFactoryManagementClient client)
            {
                Console.WriteLine("Creating pipeline " + pipelineName + "...");
                PipelineResource resource = new PipelineResource
                {
                    Parameters = new Dictionary<string, ParameterSpecification>
                    {
                        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    
                    },
                    Activities = new List<Activity>
                    {
                        new CopyActivity
                        {
                            Name = copyBlobActivity,
                            Inputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSourceDatasetName
                                }
                            },
                            Outputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSinkDatasetName
                                }
                            },
                            Source = new BlobSource { },
                            Sink = new BlobSink { }
                        },
                        new WebActivity
                        {
                            Name = sendSuccessEmailActivity,
                            Method = WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/00000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Succeeded" }
                                }
                            }
                        },
                        new WebActivity
                        {
                            Name = sendFailEmailActivity,
                            Method =WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').error.message}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Failed" }
                                }
                            }
                        }
                    }
                };
                Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
                return resource;
            }
    
  2. Lägg till följande rad i metoden Main som skapar pipelinen:

    client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
    

Parametrar

Det första avsnittet i vår pipelinekod definierar parametrar.

  • sourceBlobContainer. Källblobdatauppsättningen använder den här parametern i pipelinen.
  • sinkBlobContainer. Datauppsättningen för mottagarblob använder den här parametern i pipelinen.
  • receiver. De två webbaktiviteterna i pipelinen som skickar lyckade eller misslyckade e-postmeddelanden till mottagaren använder den här parametern.
Parameters = new Dictionary<string, ParameterSpecification>
    {
        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    },

Webbaktivitet

Webbaktiviteten tillåter ett anrop till valfri REST-slutpunkt. Mer information om aktiviteten finns i Webbaktivitet i Azure Data Factory. Den här pipelinen använder en webbaktivitet för att anropa Logic Apps e-postarbetsflöde. Du skapar två webbaktiviteter: en som anropar arbetsflödet CopySuccessEmail och en som anropar CopyFailWorkFlow.

        new WebActivity
        {
            Name = sendCopyEmailActivity,
            Method = WebActivityMethod.POST,
            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/12345",
            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
            DependsOn = new List<ActivityDependency>
            {
                new ActivityDependency
                {
                    Activity = copyBlobActivity,
                    DependencyConditions = new List<String> { "Succeeded" }
                }
            }
        }

I egenskapen Url klistrar du in HTTP POST-URL-slutpunkterna från dina Logic Apps-arbetsflöden. I egenskapen Body skickar du en instans av EmailRequest klassen. E-postbegäran innehåller följande egenskaper:

  • Meddelande. Skickar värdet för @{activity('CopyBlobtoBlob').output.dataWritten. Öppnar en egenskap för den tidigare kopieringsaktiviteten och skickar värdet dataWrittenför . Vid ett fel skickas felutdata i stället för @{activity('CopyBlobtoBlob').error.message.
  • Namn på datafabrik. Skickar värdet för @{pipeline().DataFactory} Den här systemvariabeln gör att du kan komma åt motsvarande datafabriksnamn. En lista över systemvariabler finns i Systemvariabler.
  • Pipelinenamn. Skickar värdet för @{pipeline().Pipeline}. Med den här systemvariabeln kan du komma åt motsvarande pipelinenamn.
  • Mottagare. Skickar värdet för "@pipeline().parameters.receiver". Kommer åt pipelineparametrarna.

Den här koden skapar ett nytt aktivitetsberoende som är beroende av den tidigare kopieringsaktiviteten.

Skapa en pipelinekörning

Lägg till följande kod i metoden Main som utlöser en pipelinekörning.

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Main-klass

Den slutliga Main metoden bör se ut så här.

// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };

Factory df = CreateOrUpdateDataFactory(client);

client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));

Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Skapa och kör programmet för att utlösa en pipelinekörning!

Övervaka en pipelinekörning

  1. Lägg till följande kod i metoden Main:

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    

    Den här koden kontrollerar kontinuerligt statusen för körningen tills den har kopierat data.

  2. Lägg till följande kod i metoden Main som hämtar information om kopieringsaktivitetskörning, till exempel storleken på data som lästs/skrivits:

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun(
    resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)).ToList();
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(activityRuns.First().Output);
        //SaveToJson(SafeJsonConvert.SerializeObject(activityRuns.First().Output, client.SerializationSettings), "ActivityRunResult.json", folderForJsons);
    }
    else
        Console.WriteLine(activityRuns.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

Kör koden

Skapa och starta programmet och kontrollera sedan pipelinekörningen.

Programmet visar förloppet för att skapa datafabrik, länkad tjänst, datauppsättningar, pipeline och pipelinekörning. Sedan kontrolleras status för pipelinekörningen. Vänta tills du ser information om körningen av kopieringsaktiviteten med storlek för lästa/skrivna data. Använd sedan verktyg som Azure Storage Explorer för att kontrollera att bloben kopierades till outputBlobPath från inputBlobPath som du angav i variabler.

Dina utdata bör likna följande exempel:

Creating data factory DFTutorialTest...
{
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "type": "AzureStorage",
  "typeProperties": {
    "connectionString": "DefaultEndpointsProtocol=https;AccountName=***;AccountKey=***"
  }
}
Creating dataset SourceStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sourceBlobContainer"
    },
    "fileName": "input.txt"
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating dataset SinkStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sinkBlobContainer"
    }
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating pipeline Adfv2TutorialBranchCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SourceStorageDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SinkStorageDataset"
          }
        ],
        "name": "CopyBlobtoBlob"
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').output.dataWritten}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendSuccessEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Succeeded"
            ]
          }
        ]
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').error.message}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendFailEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Failed"
            ]
          }
        ]
      }
    ],
    "parameters": {
      "sourceBlobContainer": {
        "type": "String"
      },
      "sinkBlobContainer": {
        "type": "String"
      },
      "receiver": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 00000000-0000-0000-0000-0000000000000
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 20,
  "dataWritten": 20,
  "copyDuration": 4,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
}
{}

Press any key to exit...

Du har gjort följande uppgifter i den här självstudien:

  • Skapa en datafabrik
  • Skapa en länkad Azure Storage-tjänst
  • Skapa en Azure Blob-datauppsättning
  • Skapa en pipeline som innehåller en kopieringsaktivitet och en webbaktivitet
  • Skicka utdata för aktiviteter till efterföljande aktiviteter
  • Använda parameteröverföring och systemvariabler
  • Starta en pipelinekörning
  • Övervaka pipelinen och aktivitetskörningar

Du kan nu fortsätta till avsnittet Begrepp för mer information om Azure Data Factory.