Sdílet prostřednictvím


Větvení a řetězení aktivit v kanálech Data Factory

PLATÍ PRO: Azure Data Factory Azure Synapse Analytics

Tip

Vyzkoušejte si službu Data Factory v Microsoft Fabric, řešení pro analýzy typu all-in-one pro podniky. Microsoft Fabric zahrnuje všechno od přesunu dat až po datové vědy, analýzy v reálném čase, business intelligence a vytváření sestav. Přečtěte si, jak začít používat novou zkušební verzi zdarma.

V tomto kurzu vytvoříte kanál služby Data Factory, který předvádí některé funkce toku řízení. Tento kanál zkopíruje z kontejneru ve službě Azure Blob Storage do jiného kontejneru ve stejném účtu úložiště. Pokud aktivita kopírování proběhne úspěšně, kanál odešle podrobnosti o úspěšné operaci kopírování v e-mailu. Tyto informace můžou zahrnovat množství zapsaných dat. Pokud aktivita kopírování selže, odešle podrobnosti o selhání kopírování, jako je například chybová zpráva, v e-mailu. V rámci tohoto kurzu se dozvíte, jak předávat parametry.

Tento obrázek poskytuje přehled scénáře:

Diagram znázorňuje Službu Azure Blob Storage, což je cíl kopie, která při úspěchu odešle e-mail s podrobnostmi nebo při selhání odešle e-mail s podrobnostmi o chybě.

V tomto kurzu se dozvíte, jak provádět následující úlohy:

  • Vytvoření datové továrny
  • Vytvoření propojené služby Azure Storage
  • Vytvoření datové sady Azure Blob
  • Vytvoření kanálu, který obsahuje aktivitu kopírování a aktivitu webu
  • Odeslání výstupů aktivit následným aktivitám
  • Použití předávání parametrů a systémových proměnných
  • Spuštění kanálu
  • Monitorování spuštění aktivit a kanálu

Tento kurz používá .NET SDK. K interakci se službou Azure Data Factory můžete použít jiné mechanismy. Rychlé starty služby Data Factory najdete v 5minutových rychlých startech.

Pokud ještě nemáte předplatné Azure, vytvořte si napřed bezplatný účet.

Požadavky

  • Účet služby Azure Storage. Úložiště objektů blob použijete jako zdrojové úložiště dat. Pokud nemáte účet úložiště Azure, přečtěte si téma Vytvoření účtu úložiště.
  • Azure Storage Explorer. Pokud chcete tento nástroj nainstalovat, přečtěte si Průzkumník služby Azure Storage.
  • Azure SQL Database Tuto databázi použijete jako úložiště dat jímky. Pokud nemáte databázi ve službě Azure SQL Database, přečtěte si téma Vytvoření databáze ve službě Azure SQL Database.
  • Visual Studio. Tento článek používá Visual Studio 2019.
  • Azure .NET SDK. Stáhněte a nainstalujte sadu Azure .NET SDK.

Seznam oblastí Azure, ve kterých je služba Data Factory aktuálně dostupná, najdete v tématu Produkty dostupné v jednotlivých oblastech. Úložiště a výpočty dat můžou být v jiných oblastech. Úložiště zahrnují Azure Storage a Azure SQL Database. Výpočetní prostředky zahrnují HDInsight, které služba Data Factory používá.

Vytvořte aplikaci, jak je popsáno v tématu Vytvoření aplikace Microsoft Entra. Podle pokynů ve stejném článku přiřaďte aplikaci k roli Přispěvatel . Pro pozdější části tohoto kurzu budete potřebovat několik hodnot, například ID aplikace (klienta) a ID adresáře (tenanta).

Vytvoření tabulky objektů blob

  1. Otevřete textový editor. Zkopírujte následující text a uložte ho místně jako input.txt.

    Ethel|Berg
    Tamika|Walsh
    
  2. Otevřete Průzkumník služby Azure Storage. Rozbalte svůj účet úložiště. Pravým tlačítkem klikněte na Kontejnery objektů blob a vyberte Vytvořit kontejner objektů blob.

  3. Pojmenujte nový kontejner adfv2branch a vyberte Nahrát a přidejte do kontejneru input.txt soubor.

Vytvoření projektu sady Visual Studio

Vytvoření konzolové aplikace C# .NET:

  1. Spusťte Visual Studio a vyberte Vytvořit nový projekt.
  2. V části Vytvořit nový projekt zvolte Konzolová aplikace (.NET Framework) pro C# a vyberte Další.
  3. Pojmenujte projekt ADFv2BranchTutorial.
  4. Vyberte .NET verze 4.5.2 nebo novější a pak vyberte Vytvořit.

Instalace balíčků NuGet

  1. Vyberte nástroje>NuGet Správce balíčků> Správce balíčků konzoly.

  2. V konzole Správce balíčků spusťte následující příkazy pro instalaci balíčků. Podrobnosti najdete v balíčku NuGet Microsoft.Azure.Management.DataFactory.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

Vytvoření klienta datové továrny

  1. Otevřete Program.cs a přidejte následující příkazy:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. Přidejte tyto statické proměnné do Program třídy. Zástupné znaky nahraďte vlastními hodnotami.

    // Set variables
    static string tenantID = "<tenant ID>";
    static string applicationId = "<application ID>";
    static string authenticationKey = "<Authentication key for your application>";
    static string subscriptionId = "<Azure subscription ID>";
    static string resourceGroup = "<Azure resource group name>";
    
    static string region = "East US";
    static string dataFactoryName = "<Data factory name>";
    
    // Specify the source Azure Blob information
    static string storageAccount = "<Azure Storage account name>";
    static string storageKey = "<Azure Storage account key>";
    // confirm that you have the input.txt file placed in th input folder of the adfv2branch container.
    static string inputBlobPath = "adfv2branch/input";
    static string inputBlobName = "input.txt";
    static string outputBlobPath = "adfv2branch/output";
    static string emailReceiver = "<specify email address of the receiver>";
    
    static string storageLinkedServiceName = "AzureStorageLinkedService";
    static string blobSourceDatasetName = "SourceStorageDataset";
    static string blobSinkDatasetName = "SinkStorageDataset";
    static string pipelineName = "Adfv2TutorialBranchCopy";
    
    static string copyBlobActivity = "CopyBlobtoBlob";
    static string sendFailEmailActivity = "SendFailEmailActivity";
    static string sendSuccessEmailActivity = "SendSuccessEmailActivity";
    
  3. Do metody Main přidejte následující kód. Tento kód vytvoří instanci DataFactoryManagementClient třídy. Tento objekt pak použijete k vytvoření datové továrny, propojené služby, datových sad a kanálu. Tento objekt můžete také použít k monitorování podrobností o spuštění kanálu.

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

Vytvoření datové továrny

  1. Přidejte do souboru Program.cs metoduCreateOrUpdateDataFactory:

    static Factory CreateOrUpdateDataFactory(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating data factory " + dataFactoryName + "...");
        Factory resource = new Factory
        {
            Location = region
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
    
        Factory response;
        {
            response = client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, resource);
        }
    
        while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")
        {
            System.Threading.Thread.Sleep(1000);
        }
        return response;
    }
    
  2. Do metody, která vytvoří datovou továrnu, přidejte následující řádek Main :

    Factory df = CreateOrUpdateDataFactory(client);
    

Vytvoření propojené služby Azure Storage

  1. Přidejte do souboru Program.cs metoduStorageLinkedServiceDefinition:

    static LinkedServiceResource StorageLinkedServiceDefinition(DataFactoryManagementClient client)
    {
       Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
       AzureStorageLinkedService storageLinkedService = new AzureStorageLinkedService
       {
           ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey)
       };
       Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
       LinkedServiceResource linkedService = new LinkedServiceResource(storageLinkedService, name:storageLinkedServiceName);
       return linkedService;
    }
    
  2. Do metody, která vytvoří propojenou službu Azure Storage, přidejte následující řádek Main :

    client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
    

Další informace o podporovaných vlastnostech a podrobnostech naleznete v tématu Vlastnosti propojené služby.

Vytvoření datových sad

V této části vytvoříte dvě datové sady, jednu pro zdroj a jednu pro jímku.

Vytvoření datové sady pro zdrojový objekt blob Azure

Přidejte metodu , která vytvoří datovou sadu objektů blob Azure. Další informace o podporovaných vlastnostech a podrobnostech najdete v tématu Vlastnosti datové sady Azure Blob.

Přidejte do souboru Program.cs metoduSourceBlobDatasetDefinition:

static DatasetResource SourceBlobDatasetDefinition(DataFactoryManagementClient client)
{
    Console.WriteLine("Creating dataset " + blobSourceDatasetName + "...");
    AzureBlobDataset blobDataset = new AzureBlobDataset
    {
        FolderPath = new Expression { Value = "@pipeline().parameters.sourceBlobContainer" },
        FileName = inputBlobName,
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = storageLinkedServiceName
        }
    };
    Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
    DatasetResource dataset = new DatasetResource(blobDataset, name:blobSourceDatasetName);
    return dataset;
}

Nadefinujete datovou sadu, která představuje zdrojová data v objektu blob Azure. Tato datová sada objektů blob odkazuje na propojenou službu Azure Storage podporovanou v předchozím kroku. Datová sada objektů blob popisuje umístění objektu blob, ze které se má kopírovat: FolderPath a FileName.

Všimněte si použití parametrů pro FolderPath. sourceBlobContainer je název parametru a výraz se nahradí hodnotami předanými při spuštění kanálu. Syntaxe pro definování parametrů je @pipeline().parameters.<parameterName>

Vytvoření datové sady pro azure blob jímky

  1. Přidejte do souboru Program.cs metoduSourceBlobDatasetDefinition:

    static DatasetResource SinkBlobDatasetDefinition(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating dataset " + blobSinkDatasetName + "...");
        AzureBlobDataset blobDataset = new AzureBlobDataset
        {
            FolderPath = new Expression { Value = "@pipeline().parameters.sinkBlobContainer" },
            LinkedServiceName = new LinkedServiceReference
            {
                ReferenceName = storageLinkedServiceName
            }
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
        DatasetResource dataset = new DatasetResource(blobDataset, name: blobSinkDatasetName);
        return dataset;
    }
    
  2. Do metody, která vytvoří zdroj objektů blob Azure i datové sady jímky, přidejte následující kód Main .

    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
    
    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
    

Vytvoření třídy EmailRequest v C#

V projektu jazyka C# vytvořte třídu s názvem EmailRequest. Tato třída definuje, jaké vlastnosti kanál odesílá v požadavku textu při odesílání e-mailu. V tomto kurzu kanál do e-mailu odešle čtyři vlastnosti:

  • Message. Text e-mailu Pro úspěšnou kopii tato vlastnost obsahuje množství zapsaných dat. U neúspěšné kopie obsahuje tato vlastnost podrobnosti o chybě.
  • Název datové továrny Název datové továrny.
  • Název kanálu Název kanálu.
  • Přijímač. Parametr, který prochází. Tato vlastnost určuje příjemce e-mailu.
    class EmailRequest
    {
        [Newtonsoft.Json.JsonProperty(PropertyName = "message")]
        public string message;

        [Newtonsoft.Json.JsonProperty(PropertyName = "dataFactoryName")]
        public string dataFactoryName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "pipelineName")]
        public string pipelineName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "receiver")]
        public string receiver;

        public EmailRequest(string input, string df, string pipeline, string receiverName)
        {
            message = input;
            dataFactoryName = df;
            pipelineName = pipeline;
            receiver = receiverName;
        }
    }

Vytvoření koncových bodů pracovního postupu pro e-maily

K aktivaci odesílání e-mailu použijete Azure Logic Apps k definování pracovního postupu. Další informace najdete v tématu Vytvoření ukázkového pracovního postupu aplikace logiky Consumption.

Pracovní postup pro e-maily s informací o úspěchu

Na webu Azure Portal vytvořte pracovní postup aplikace logiky s názvem CopySuccessEmail. Přidejte trigger požadavku s názvem Při přijetí požadavku HTTP. V triggeru požadavku vyplňte do pole Schématu JSON textu požadavku následující kód JSON:

{
    "properties": {
        "dataFactoryName": {
            "type": "string"
        },
        "message": {
            "type": "string"
        },
        "pipelineName": {
            "type": "string"
        },
        "receiver": {
            "type": "string"
        }
    },
    "type": "object"
}

Váš pracovní postup vypadá přibližně jako v následujícím příkladu:

Pracovní postup pro e-maily s informací o úspěchu

Tento obsah JSON odpovídá EmailRequest třídě, kterou jste vytvořili v předchozí části.

Přidejte akci Office 365 Outlook s názvem Odeslat e-mail. Pro tuto akci si přizpůsobte, jak chcete e-mail formátovat, pomocí vlastností předaných ve schématu JSON textu požadavku. Tady je příklad:

Návrhář pracovního postupu s akcí Odeslat e-mail

Po uložení pracovního postupu zkopírujte a uložte hodnotu adresy URL HTTP POST z triggeru.

Pracovní postup pro e-maily s informací o úspěchu

Naklonujte CopySuccessEmail pracovní postup aplikace logiky do nového pracovního postupu s názvem CopyFailEmail. V triggeru požadavku je schéma JSON textu požadavku stejné. Změňte formát e-mailu, například Subject, tak, aby to odpovídalo neúspěchu. Zde je příklad:

Návrhář pracovního postupu a pracovní postup neúspěšného e-mailu

Po uložení pracovního postupu zkopírujte a uložte hodnotu adresy URL HTTP POST z triggeru.

Teď byste měli mít dvě adresy URL pracovního postupu, například následující příklady:

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

Vytvořit kanál

Vraťte se do projektu v sadě Visual Studio. Teď přidáme kód, který vytvoří kanál s aktivitou kopírování a DependsOn vlastností. V tomto kurzu kanál obsahuje jednu aktivitu, aktivitu kopírování, která přebírá datovou sadu objektů blob jako zdroj a jinou datovou sadu objektů blob jako jímku. Pokud aktivita kopírování proběhne úspěšně nebo selže, volá různé e-mailové úlohy.

V tomto kanálu použijete následující funkce:

  • Parametry
  • Webová aktivita
  • Závislost aktivit
  • Použití výstupu z aktivity jako vstupu do jiné aktivity
  1. Přidejte tuto metodu do projektu. Další podrobnosti najdete v následujících částech.

    static PipelineResource PipelineDefinition(DataFactoryManagementClient client)
            {
                Console.WriteLine("Creating pipeline " + pipelineName + "...");
                PipelineResource resource = new PipelineResource
                {
                    Parameters = new Dictionary<string, ParameterSpecification>
                    {
                        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    
                    },
                    Activities = new List<Activity>
                    {
                        new CopyActivity
                        {
                            Name = copyBlobActivity,
                            Inputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSourceDatasetName
                                }
                            },
                            Outputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSinkDatasetName
                                }
                            },
                            Source = new BlobSource { },
                            Sink = new BlobSink { }
                        },
                        new WebActivity
                        {
                            Name = sendSuccessEmailActivity,
                            Method = WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/00000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Succeeded" }
                                }
                            }
                        },
                        new WebActivity
                        {
                            Name = sendFailEmailActivity,
                            Method =WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').error.message}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Failed" }
                                }
                            }
                        }
                    }
                };
                Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
                return resource;
            }
    
  2. Do metody, která vytvoří kanál, přidejte následující řádek Main :

    client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
    

Parametry

První část kódu kanálu definuje parametry.

  • sourceBlobContainer. Zdrojová datová sada objektů blob využívá tento parametr v kanálu.
  • sinkBlobContainer. Datová sada objektů blob jímky využívá tento parametr v kanálu.
  • receiver. Tento parametr používají dvě webové aktivity v kanálu, které odesílají e-maily o úspěchu nebo neúspěchu příjemci.
Parameters = new Dictionary<string, ParameterSpecification>
    {
        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    },

Webová aktivita

Webová aktivita umožňuje volání libovolného koncového bodu REST. Další informace o aktivitě najdete v tématu Webová aktivita ve službě Azure Data Factory. Tento kanál používá webovou aktivitu k volání pracovního postupu e-mailu Logic Apps. Vytvoříte dvě webové aktivity: jednu, která volá CopySuccessEmail pracovní postup, a jednu, která volá CopyFailWorkFlow.

        new WebActivity
        {
            Name = sendCopyEmailActivity,
            Method = WebActivityMethod.POST,
            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/12345",
            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
            DependsOn = new List<ActivityDependency>
            {
                new ActivityDependency
                {
                    Activity = copyBlobActivity,
                    DependencyConditions = new List<String> { "Succeeded" }
                }
            }
        }

Url Do vlastnosti vložte koncové body adresy URL HTTP POST z pracovních postupů Logic Apps. Body Ve vlastnosti předejte instanci EmailRequest třídy. Obsahuje následující vlastnosti:

  • Message. Předává hodnotu @{activity('CopyBlobtoBlob').output.dataWritten. Přistupuje k vlastnosti předchozí aktivity kopírování a předává hodnotu dataWritten. V případě neúspěchu předejte výstup chyby místo @{activity('CopyBlobtoBlob').error.message.
  • Název datové továrny. Předá hodnotu @{pipeline().DataFactory} Této systémové proměnné, která umožňuje přístup k odpovídajícímu názvu datové továrny. Seznam systémových proměnných najdete v tématu Systémové proměnné.
  • Název kanálu Předává hodnotu @{pipeline().Pipeline}. Tato systémová proměnná umožňuje přístup k odpovídajícímu názvu kanálu.
  • Přijímač. Předává hodnotu "@pipeline().parameters.receiver". Přistupuje k parametrům kanálu.

Tento kód vytvoří novou závislost aktivity, která závisí na předchozí aktivitě kopírování.

Vytvoření spuštění kanálu

Do metody, která aktivuje spuštění kanálu, přidejte následující kód Main .

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Hlavní třída

Main Konečná metoda by měla vypadat takto.

// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };

Factory df = CreateOrUpdateDataFactory(client);

client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));

Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Sestavte a spusťte program pro aktivaci spuštění kanálu!

Monitorování spuštění kanálu

  1. Do metody Main přidejte následující kód:

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    

    Tento kód nepřetržitě kontroluje stav spuštění, dokud nedokončí kopírování dat.

  2. Do metody, která načte podrobnosti o spuštění aktivity kopírování, přidejte následující kód Main , například velikost načtených a zapsaných dat:

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun(
    resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)).ToList();
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(activityRuns.First().Output);
        //SaveToJson(SafeJsonConvert.SerializeObject(activityRuns.First().Output, client.SerializationSettings), "ActivityRunResult.json", folderForJsons);
    }
    else
        Console.WriteLine(activityRuns.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

Spuštění kódu

Sestavte a spusťte aplikaci a potom ověřte spuštění kanálu.

Aplikace zobrazí průběh vytváření datové továrny, propojené služby, datových sad, kanálu a spuštění kanálu. Potom zkontroluje stav spuštění kanálu. Počkejte, dokud aktivita kopírování nezobrazí údaje o velikosti načtených/zapsaných dat. Potom pomocí nástrojů, jako je Průzkumník služby Azure Storage, zkontrolujte, že se objekt blob zkopíroval do výstupuBlobPath ze inputBlobPath, jak jste zadali v proměnných.

Výstup by měl vypadat přibližně jako v následující ukázce:

Creating data factory DFTutorialTest...
{
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "type": "AzureStorage",
  "typeProperties": {
    "connectionString": "DefaultEndpointsProtocol=https;AccountName=***;AccountKey=***"
  }
}
Creating dataset SourceStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sourceBlobContainer"
    },
    "fileName": "input.txt"
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating dataset SinkStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sinkBlobContainer"
    }
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating pipeline Adfv2TutorialBranchCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SourceStorageDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SinkStorageDataset"
          }
        ],
        "name": "CopyBlobtoBlob"
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').output.dataWritten}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendSuccessEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Succeeded"
            ]
          }
        ]
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').error.message}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendFailEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Failed"
            ]
          }
        ]
      }
    ],
    "parameters": {
      "sourceBlobContainer": {
        "type": "String"
      },
      "sinkBlobContainer": {
        "type": "String"
      },
      "receiver": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 00000000-0000-0000-0000-0000000000000
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 20,
  "dataWritten": 20,
  "copyDuration": 4,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
}
{}

Press any key to exit...

V tomto kurzu jste provedli následující úlohy:

  • Vytvoření datové továrny
  • Vytvoření propojené služby Azure Storage
  • Vytvoření datové sady Azure Blob
  • Vytvoření kanálu, který obsahuje aktivitu kopírování a aktivitu webu
  • Odeslání výstupů aktivit následným aktivitám
  • Použití předávání parametrů a systémových proměnných
  • Spuštění kanálu
  • Monitorování spuštění aktivit a kanálu

Teď můžete pokračovat v části Koncepty, kde najdete další informace o službě Azure Data Factory.