共用方式為


在 Data Factory 管道中將活動分支和鏈結

適用於:Azure Data Factory Azure Synapse Analytics

提示

試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用 (部分機器翻譯)!

在本教學課程中,您會建立 Data Factory 管線來展示部分控制流程功能。 此管線會從 Azure Blob 儲存體中的一個容器複製到相同儲存體帳戶中的另一個容器。 如果複製活動成功,管線會在電子郵件中傳送成功複製作業的詳細資料。 該資訊可能包含寫入的資料量。 如果複製活動失敗,則管線會在電子郵件中傳送複製失敗的詳細資料,例如錯誤訊息。 在整個教學課程中,您會看到如何傳遞參數。

下圖提供此案例的概觀:

圖表顯示 Azure Blob 儲存體,這是複本的目標,在成功時,會傳送含有詳細數據的電子郵件,或在失敗時傳送含有錯誤詳細數據的電子郵件。

本教學課程將說明如何執行下列工作:

  • 建立資料處理站
  • 建立 Azure 儲存體連結服務
  • 建立 Azure Blob 資料集
  • 建立包含複製活動和網路活動的管道
  • 將活動的輸出傳送至後續的活動
  • 使用參數傳遞和系統變數
  • 啟動管道執行
  • 監視管道和活動執行

本教學課程使用 .NET SDK。 您可以使用其他機制與 Azure Data Factory 互動。 如需 Data Factory 的快速入門,請參閱 5 分鐘快速入門

如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶

必要條件

  • Azure 儲存體帳戶。 您會使用 Blob 儲存體作為來源資料存放區。 如果您沒有 Azure 儲存體帳戶,請參閱 建立儲存體帳戶
  • Azure 儲存體總管。 若要安裝此工具,請參閱 Azure 儲存體總管
  • Azure SQL Database。 您會使用資料庫作為接收資料存放區。 如果您在 Azure SQL Database 中沒有資料庫,請參閱在 Azure SQL Database 中建立資料庫
  • Visual Studio。 本文使用 Visual Studio 2019。
  • Azure .NET SDK。 下載並安裝 Azure .NET SDK

如需目前可使用 Data Factory 的 Azure 區域,請參閱依區域提供的產品。 資料存放區和計算可位於其他區域。 這些存放區包含 Azure 儲存體和 Azure SQL Database。 計算包含 Data Factory 所使用的 HDInsight。

依照建立 Microsoft Entra 應用程式 (部分機器翻譯) 中的說明建立應用程式。 依照同一篇文章中的指示,將應用程式指派給「參與者」角色。 您在本教學課程的後續部分將需要數個值,例如應用程式 (用戶端) 識別碼目錄 (租用戶) 識別碼

建立 Blob 資料表

  1. 開啟文字編輯器。 複製下列文字,並在本機位置將其儲存為 input.txt

    Ethel|Berg
    Tamika|Walsh
    
  2. 開啟 [Azure 儲存體總管]。 展開您的儲存體帳戶。 以滑鼠右鍵按一下 [Blob 容器],然後選取 [建立 Blob 容器]

  3. 將新容器命名為 adfv2branch,然後選取 [上傳] 將您的 input.txt 檔案新增至該容器。

建立 Visual Studio 專案

建立 C# .NET 主控台應用程式:

  1. 啟動 Visual Studio 並選取 [建立新專案]
  2. 在 [建立新專案] 中,針對 C# 選擇 [主控台應用程式 (.NET Framework)],然後選取 [下一步]
  3. 將專案命名為 ADFv2BranchTutorial
  4. 選取 [.NET 4.5.2 版] 或更新版本,然後選取 [建立]

安裝 NuGet 套件

  1. 選取 [工具]>[NuGet 套件管理員]>[套件管理員主控台]

  2. 在 [套件管理員主控台] 中執行下列命令,以安裝套件。 如需詳細資訊,請參閱 Microsoft.Azure.Management.DataFactory Nuget 套件

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

建立資料處理站用戶端

  1. 開啟 Program.cs,並新增下列陳述式:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. 將這些靜態變數新增至 Program 類別。 將預留位置取代為您自己的值。

    // Set variables
    static string tenantID = "<tenant ID>";
    static string applicationId = "<application ID>";
    static string authenticationKey = "<Authentication key for your application>";
    static string subscriptionId = "<Azure subscription ID>";
    static string resourceGroup = "<Azure resource group name>";
    
    static string region = "East US";
    static string dataFactoryName = "<Data factory name>";
    
    // Specify the source Azure Blob information
    static string storageAccount = "<Azure Storage account name>";
    static string storageKey = "<Azure Storage account key>";
    // confirm that you have the input.txt file placed in th input folder of the adfv2branch container.
    static string inputBlobPath = "adfv2branch/input";
    static string inputBlobName = "input.txt";
    static string outputBlobPath = "adfv2branch/output";
    static string emailReceiver = "<specify email address of the receiver>";
    
    static string storageLinkedServiceName = "AzureStorageLinkedService";
    static string blobSourceDatasetName = "SourceStorageDataset";
    static string blobSinkDatasetName = "SinkStorageDataset";
    static string pipelineName = "Adfv2TutorialBranchCopy";
    
    static string copyBlobActivity = "CopyBlobtoBlob";
    static string sendFailEmailActivity = "SendFailEmailActivity";
    static string sendSuccessEmailActivity = "SendSuccessEmailActivity";
    
  3. 將下列程式碼新增至 Main 方法。 此程式碼會建立 DataFactoryManagementClient 類別的執行個體。 接著,您會使用此物件來建立資料處理站、連結服務、資料集和管線。 您也可以使用此物件來監視管線執行的詳細資料。

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

建立資料處理站

  1. CreateOrUpdateDataFactory 方法新增至您的 Program.cs 檔案:

    static Factory CreateOrUpdateDataFactory(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating data factory " + dataFactoryName + "...");
        Factory resource = new Factory
        {
            Location = region
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
    
        Factory response;
        {
            response = client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, resource);
        }
    
        while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")
        {
            System.Threading.Thread.Sleep(1000);
        }
        return response;
    }
    
  2. 將以下這一行新增至 Main 方法,以建立資料處理站:

    Factory df = CreateOrUpdateDataFactory(client);
    

建立 Azure 儲存體連結服務

  1. StorageLinkedServiceDefinition 方法新增至您的 Program.cs 檔案:

    static LinkedServiceResource StorageLinkedServiceDefinition(DataFactoryManagementClient client)
    {
       Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
       AzureStorageLinkedService storageLinkedService = new AzureStorageLinkedService
       {
           ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey)
       };
       Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
       LinkedServiceResource linkedService = new LinkedServiceResource(storageLinkedService, name:storageLinkedServiceName);
       return linkedService;
    }
    
  2. 將以下這一行新增至 Main 方法,以建立 Azure 儲存體連結服務:

    client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
    

若要進一步了解支援的屬性和詳細資料,請參閱連結服務屬性

建立資料集

在本節中,您會建立兩個資料集:一個用於來源,另一個用於接收。

建立來源 Azure Blob 的資料集

新增建立 Azure Blob 資料集的方法。 若要進一步了解支援的屬性和詳細資訊,請參閱 Azure Blob 資料集屬性

SourceBlobDatasetDefinition 方法新增至您的 Program.cs 檔案:

static DatasetResource SourceBlobDatasetDefinition(DataFactoryManagementClient client)
{
    Console.WriteLine("Creating dataset " + blobSourceDatasetName + "...");
    AzureBlobDataset blobDataset = new AzureBlobDataset
    {
        FolderPath = new Expression { Value = "@pipeline().parameters.sourceBlobContainer" },
        FileName = inputBlobName,
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = storageLinkedServiceName
        }
    };
    Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
    DatasetResource dataset = new DatasetResource(blobDataset, name:blobSourceDatasetName);
    return dataset;
}

您可以定義資料集來代表 Azure Blob 中的來源資料。 此 Blob 資料集會參考先前的步驟中支援的 Azure 儲存體連結服務。 Blob 資料集會說明要從中複製 Blob 的位置:FolderPathFileName

請注意 FolderPath 的參數用法。 sourceBlobContainer 是參數的名稱,而運算式會取代為傳入管線執行中的值。 定義參數的語法是 @pipeline().parameters.<parameterName>

建立接收 Azure Blob 的資料集

  1. SourceBlobDatasetDefinition 方法新增至您的 Program.cs 檔案:

    static DatasetResource SinkBlobDatasetDefinition(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating dataset " + blobSinkDatasetName + "...");
        AzureBlobDataset blobDataset = new AzureBlobDataset
        {
            FolderPath = new Expression { Value = "@pipeline().parameters.sinkBlobContainer" },
            LinkedServiceName = new LinkedServiceReference
            {
                ReferenceName = storageLinkedServiceName
            }
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
        DatasetResource dataset = new DatasetResource(blobDataset, name: blobSinkDatasetName);
        return dataset;
    }
    
  2. 將下列程式碼新增至 Main 方法,以建立 Azure Blob 來源和接收資料集。

    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
    
    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
    

建立 C# 類別:EmailRequest

在 C# 專案中,建立名為 EmailRequest 的類別。 此類別會定義在傳送電子郵件時,管線在本文要求中會傳送哪些屬性。 在本教學課程中,管道會將四個屬性從管道傳送至電子郵件:

  • 訊息。 電子郵件本文。 如果複製成功,此屬性會包含寫入的資料量。 如果複製失敗,則此屬性會包含錯誤的詳細資料。
  • Data Factory 名稱。 資料處理站的名稱。
  • 管線名稱。 管線的名稱。
  • 接收者。 傳遞的參數。 此屬性指定電子郵件的接收者。
    class EmailRequest
    {
        [Newtonsoft.Json.JsonProperty(PropertyName = "message")]
        public string message;

        [Newtonsoft.Json.JsonProperty(PropertyName = "dataFactoryName")]
        public string dataFactoryName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "pipelineName")]
        public string pipelineName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "receiver")]
        public string receiver;

        public EmailRequest(string input, string df, string pipeline, string receiverName)
        {
            message = input;
            dataFactoryName = df;
            pipelineName = pipeline;
            receiver = receiverName;
        }
    }

建立電子郵件工作流程端點

若要觸發傳送電子郵件,您需要使用 Azure Logic Apps (部分機器翻譯) 來定義工作流程。 如需詳細資訊,請參閱建立範例使用量邏輯應用程式工作流程 (部分機器翻譯)。

成功電子郵件工作流程

Azure 入口網站中,建立名為 CopySuccessEmail 的邏輯應用程式工作流程。 新增名為 [收到 HTTP 要求時] 的要求觸發程序。 在 [要求] 觸發程序中,使用下列 JSON 填入 [要求本文 JSON 結構描述] 方塊:

{
    "properties": {
        "dataFactoryName": {
            "type": "string"
        },
        "message": {
            "type": "string"
        },
        "pipelineName": {
            "type": "string"
        },
        "receiver": {
            "type": "string"
        }
    },
    "type": "object"
}

您的工作流程會類似下列範例:

成功電子郵件工作流程

此 JSON 內容與您在上一節建立的 EmailRequest 類別一致。

新增名為 [傳送電子郵件] 的 [Office 365 Outlook] 動作。 針對此動作,使用傳入要求本文 JSON 結構描述中的屬性,自訂您要設定電子郵件格式的方式。 以下是範例:

具有名為 [傳送電子郵件] 動作的工作流程設計工具。

儲存工作流程後,請複製並儲存觸發程序中的 HTTP POST URL 值。

失敗電子郵件工作流程

CopySuccessEmail 邏輯應用程式工作流程複製到名為 CopyFailEmail 的新工作流程。 在 [要求] 觸發程序中,[要求本文 JSON 結構描述] 是相同的。 變更電子郵件的格式 (例如 Subject),以調整為適合失敗電子郵件。 以下是範例:

工作流程設計工具與失敗的電子郵件工作流程。

儲存工作流程後,請複製並儲存觸發程序中的 HTTP POST URL 值。

您現在應該有兩個工作流程 URL,如下列範例所示:

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

建立新管線

返回您在 Visual Studio 中的專案。 我們現在將新增程式碼,以建立具有複製活動和 DependsOn 屬性的管線。 在本教學課程中,此管線包含一個活動 (複製活動),此活動以 Blob 資料集作為來源,並以另一個 Blob 資料集作為接收。 複製活動成功或失敗時,會呼叫不同的電子郵件工作。

在此管道中,您會使用下列功能:

  • 參數
  • 網路活動
  • 活動相依性
  • 使用一個活動的輸出作為其他活動的輸入
  1. 將此方法新增至您的專案。 下列各節將提供更多詳細資料。

    static PipelineResource PipelineDefinition(DataFactoryManagementClient client)
            {
                Console.WriteLine("Creating pipeline " + pipelineName + "...");
                PipelineResource resource = new PipelineResource
                {
                    Parameters = new Dictionary<string, ParameterSpecification>
                    {
                        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    
                    },
                    Activities = new List<Activity>
                    {
                        new CopyActivity
                        {
                            Name = copyBlobActivity,
                            Inputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSourceDatasetName
                                }
                            },
                            Outputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSinkDatasetName
                                }
                            },
                            Source = new BlobSource { },
                            Sink = new BlobSink { }
                        },
                        new WebActivity
                        {
                            Name = sendSuccessEmailActivity,
                            Method = WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/00000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Succeeded" }
                                }
                            }
                        },
                        new WebActivity
                        {
                            Name = sendFailEmailActivity,
                            Method =WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').error.message}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Failed" }
                                }
                            }
                        }
                    }
                };
                Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
                return resource;
            }
    
  2. 將以下這一行新增至 Main 方法以建立管線:

    client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
    

參數

管線程式碼的第一個區段會定義參數。

  • sourceBlobContainer. 來源 Blob 資料集會在管線中使用此參數。
  • sinkBlobContainer. 接收 Blob 資料集會在管線中使用此參數。
  • receiver. 管線中將成功或失敗電子郵件傳送給接收者的兩個 Web 活動都會使用此參數。
Parameters = new Dictionary<string, ParameterSpecification>
    {
        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    },

網路活動

Web 活動允許呼叫任何 REST 端點。 如需活動的詳細資訊,請參閱 Azure Data Factory 中的 Web 活動。 這個管線會使用 Web 活動來呼叫 Logic Apps 電子郵件工作流程。 您會建立兩個 Web 活動:一個會呼叫 CopySuccessEmail 工作流程,另一個會呼叫 CopyFailWorkFlow

        new WebActivity
        {
            Name = sendCopyEmailActivity,
            Method = WebActivityMethod.POST,
            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/12345",
            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
            DependsOn = new List<ActivityDependency>
            {
                new ActivityDependency
                {
                    Activity = copyBlobActivity,
                    DependencyConditions = new List<String> { "Succeeded" }
                }
            }
        }

Url 屬性中,從您的 Logic Apps 工作流程貼上 HTTP POST URL 端點。 在 Body 屬性中,傳遞 EmailRequest 類別的執行個體。 電子郵件要求包含下列屬性:

  • 訊息。 傳遞 @{activity('CopyBlobtoBlob').output.dataWritten 的值。 存取先前複製活動的屬性,並傳遞 dataWritten 的值。 對於失敗案例,請傳遞錯誤輸出,而不是 @{activity('CopyBlobtoBlob').error.message
  • Data Factory 名稱。 傳遞 @{pipeline().DataFactory} 的值。此系統變數可讓您存取對應的資料處理站名稱。 如需系統變數的清單,請參閱系統變數
  • 管線名稱。 傳遞 @{pipeline().Pipeline} 的值。 此系統變數可讓您存取對應的管線名稱。
  • 接收者。 傳遞 "@pipeline().parameters.receiver" 的值。 存取管線參數。

此程式碼會建立新的活動相依性,而此相依性取決於先前的複製活動。

建立管線執行

將下列程式碼新增至 Main 方法以觸發管線執行。

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Main 類別

最終的 Main 方法應該會顯示如下。

// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };

Factory df = CreateOrUpdateDataFactory(client);

client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));

Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

建置並執行您的程式來觸發管道執行!

監視管線執行

  1. 將下列程式碼加入 Main 方法:

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    

    此程式碼會持續檢查執行狀態,直到它完成資料複製為止。

  2. 將下列程式碼新增至 Main 方法以擷取複製活動執行的詳細資料,例如讀取/寫入的資料大小:

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun(
    resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)).ToList();
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(activityRuns.First().Output);
        //SaveToJson(SafeJsonConvert.SerializeObject(activityRuns.First().Output, client.SerializationSettings), "ActivityRunResult.json", folderForJsons);
    }
    else
        Console.WriteLine(activityRuns.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

執行程式碼

建置並啟動應用程式,然後確認管線執行。

應用程式會顯示建立資料處理站、連結服務、資料集、管線和管線執行的進度。 然後會檢查管線執行狀態。 請等待出現複製活動執行詳細資料及讀取/寫入的資料大小。 然後,使用 Azure 儲存體總管之類的工具,確認 Blob 已從 inputBlobPath 複製到 outputBlobPath,如同您在變數中的指定。

輸出應該會如下列範例所示:

Creating data factory DFTutorialTest...
{
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "type": "AzureStorage",
  "typeProperties": {
    "connectionString": "DefaultEndpointsProtocol=https;AccountName=***;AccountKey=***"
  }
}
Creating dataset SourceStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sourceBlobContainer"
    },
    "fileName": "input.txt"
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating dataset SinkStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sinkBlobContainer"
    }
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating pipeline Adfv2TutorialBranchCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SourceStorageDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SinkStorageDataset"
          }
        ],
        "name": "CopyBlobtoBlob"
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').output.dataWritten}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendSuccessEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Succeeded"
            ]
          }
        ]
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').error.message}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendFailEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Failed"
            ]
          }
        ]
      }
    ],
    "parameters": {
      "sourceBlobContainer": {
        "type": "String"
      },
      "sinkBlobContainer": {
        "type": "String"
      },
      "receiver": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 00000000-0000-0000-0000-0000000000000
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 20,
  "dataWritten": 20,
  "copyDuration": 4,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
}
{}

Press any key to exit...

您已在此教學課程中執行下列工作:

  • 建立資料處理站
  • 建立 Azure 儲存體連結服務
  • 建立 Azure Blob 資料集
  • 建立包含複製活動和網路活動的管道
  • 將活動的輸出傳送至後續的活動
  • 使用參數傳遞和系統變數
  • 啟動管道執行
  • 監視管道和活動執行

您現在可以繼續閱讀「概念」一節,以了解 Azure Data Factory 的詳細資訊。