Udostępnij za pośrednictwem


Rozgałęzianie działań i tworzenie łańcuchów działań w potoku usługi Data Factory

DOTYCZY: Azure Data Factory Azure Synapse Analytics

Napiwek

Wypróbuj usługę Data Factory w usłudze Microsoft Fabric — rozwiązanie analityczne typu all-in-one dla przedsiębiorstw. Usługa Microsoft Fabric obejmuje wszystko, od przenoszenia danych do nauki o danych, analizy w czasie rzeczywistym, analizy biznesowej i raportowania. Dowiedz się, jak bezpłatnie rozpocząć nową wersję próbną !

W tym samouczku utworzysz potok usługi Data Factory, który prezentuje niektóre funkcje przepływu sterowania. Ten potok kopiuje z kontenera w usłudze Azure Blob Storage do innego kontenera na tym samym koncie magazynu. Jeśli działanie kopiowania zakończy się pomyślnie, potok wysyła szczegóły pomyślnej operacji kopiowania w wiadomości e-mail. Te informacje mogą obejmować ilość zapisanych danych. Jeśli działanie kopiowania nie powiedzie się, wysyła szczegóły błędu kopiowania, takie jak komunikat o błędzie, w wiadomości e-mail. W samouczku pokazano, jak przekazać parametry.

Ta grafika zawiera omówienie scenariusza:

Diagram przedstawia usługę Azure Blob Storage, która jest obiektem docelowym kopii, która po pomyślnym zakończeniu wysyła wiadomość e-mail ze szczegółami lub, w przypadku niepowodzenia, wysyła wiadomość e-mail ze szczegółami błędu.

W tym samouczku przedstawiono sposób wykonywania następujących zadań:

  • Tworzenie fabryki danych
  • Tworzenie połączonej usługi Azure Storage
  • Tworzenie zestawu danych obiektów blob platformy Azure
  • Tworzenie potoku zawierającego działanie kopiowania i działanie internetowe
  • Wysyłanie danych wyjściowych działań do kolejnych działań
  • Używanie przekazywania parametrów i zmiennych systemowych
  • Uruchamianie potoku
  • Monitorowanie uruchomień działań i potoku

W tym samouczku jest używany zestaw SDK platformy .NET. Do interakcji z usługą Azure Data Factory można użyć innych mechanizmów. Aby zapoznać się z przewodnikami Szybki start usługi Data Factory, zobacz 5-minutowe przewodniki Szybki start.

Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto.

Wymagania wstępne

  • Konto usługi Azure Storage. Magazyn obiektów blob jest używany jako źródłowy magazyn danych. Jeśli nie masz konta usługi Azure Storage, zobacz Tworzenie konta magazynu.
  • Eksplorator magazynu Azure. Aby zainstalować to narzędzie, zobacz Eksplorator usługi Azure Storage.
  • Usługa Azure SQL Database. Baza danych jest używana jako magazyn danych będący ujściem. Jeśli nie masz bazy danych w usłudze Azure SQL Database, zobacz Tworzenie bazy danych w usłudze Azure SQL Database.
  • Visual Studio. W tym artykule jest używany program Visual Studio 2019.
  • Zestaw .NET SDK platformy Azure. Pobierz i zainstaluj zestaw AZURE .NET SDK.

Aby uzyskać listę regionów świadczenia usługi Azure, w których usługa Data Factory jest obecnie dostępna, zobacz Dostępność produktów według regionów. Magazyny danych i obliczenia mogą znajdować się w innych regionach. Magazyny obejmują usługi Azure Storage i Azure SQL Database. Obliczenia obejmują usługę HDInsight, której używa usługa Data Factory.

Utwórz aplikację zgodnie z opisem w temacie Tworzenie aplikacji firmy Microsoft Entra. Przypisz aplikację do roli Współautor , postępując zgodnie z instrukcjami w tym samym artykule. W kolejnych częściach tego samouczka będzie potrzebnych kilka wartości, takich jak identyfikator aplikacji (klienta) i identyfikator katalogu (dzierżawy).

Tworzenie tabeli obiektów blob

  1. Uruchom edytor tekstów. Skopiuj następujący tekst i zapisz go lokalnie jako input.txt.

    Ethel|Berg
    Tamika|Walsh
    
  2. Otwórz Eksplorator usługi Azure Storage. Rozwiń konto magazynu. Kliknij prawym przyciskiem myszy pozycję Blob Containers (Kontenery obiektów blob) i wybierz polecenie Create Blob Container (Utwórz kontener obiektów blob).

  3. Nadaj nowemu kontenerowi nazwę adfv2branch i wybierz pozycję Przekaż , aby dodać plik input.txt do kontenera.

Tworzenie projektu programu Visual Studio

Utwórz aplikację konsolową platformy .NET w języku C#:

  1. Uruchom program Visual Studio i wybierz pozycję Utwórz nowy projekt.
  2. W obszarze Utwórz nowy projekt wybierz pozycję Aplikacja konsolowa (.NET Framework) dla języka C#, a następnie wybierz pozycję Dalej.
  3. Nadaj projektowi nazwę ADFv2BranchTutorial.
  4. Wybierz pozycję .NET w wersji 4.5.2 lub nowszej, a następnie wybierz pozycję Utwórz.

Instalowanie pakietów NuGet

  1. Wybierz pozycję Narzędzia NuGet>Menedżer pakietów> Menedżer pakietów Konsola.

  2. W oknie Konsola menedżera pakietów uruchom następujące polecenia, aby zainstalować pakiety. Aby uzyskać szczegółowe informacje, zapoznaj się z pakietem NuGet Microsoft.Azure.Management.DataFactory.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

Tworzenie klienta fabryki danych

  1. Otwórz Program.cs i dodaj następujące instrukcje:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. Dodaj te zmienne statyczne do Program klasy. Zastąp symbole zastępcze własnymi wartościami.

    // Set variables
    static string tenantID = "<tenant ID>";
    static string applicationId = "<application ID>";
    static string authenticationKey = "<Authentication key for your application>";
    static string subscriptionId = "<Azure subscription ID>";
    static string resourceGroup = "<Azure resource group name>";
    
    static string region = "East US";
    static string dataFactoryName = "<Data factory name>";
    
    // Specify the source Azure Blob information
    static string storageAccount = "<Azure Storage account name>";
    static string storageKey = "<Azure Storage account key>";
    // confirm that you have the input.txt file placed in th input folder of the adfv2branch container.
    static string inputBlobPath = "adfv2branch/input";
    static string inputBlobName = "input.txt";
    static string outputBlobPath = "adfv2branch/output";
    static string emailReceiver = "<specify email address of the receiver>";
    
    static string storageLinkedServiceName = "AzureStorageLinkedService";
    static string blobSourceDatasetName = "SourceStorageDataset";
    static string blobSinkDatasetName = "SinkStorageDataset";
    static string pipelineName = "Adfv2TutorialBranchCopy";
    
    static string copyBlobActivity = "CopyBlobtoBlob";
    static string sendFailEmailActivity = "SendFailEmailActivity";
    static string sendSuccessEmailActivity = "SendSuccessEmailActivity";
    
  3. Dodaj następujący kod do metody Main: Ten kod tworzy wystąpienie DataFactoryManagementClient klasy. Następnie użyjesz tego obiektu do utworzenia fabryki danych, połączonej usługi, zestawów danych i potoku. Tego obiektu można również użyć do monitorowania szczegółów przebiegu potoku.

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

Tworzenie fabryki danych

  1. Dodaj metodę CreateOrUpdateDataFactory do pliku Program.cs :

    static Factory CreateOrUpdateDataFactory(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating data factory " + dataFactoryName + "...");
        Factory resource = new Factory
        {
            Location = region
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
    
        Factory response;
        {
            response = client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, resource);
        }
    
        while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")
        {
            System.Threading.Thread.Sleep(1000);
        }
        return response;
    }
    
  2. Dodaj następujący wiersz do Main metody, która tworzy fabrykę danych:

    Factory df = CreateOrUpdateDataFactory(client);
    

Tworzenie połączonej usługi Azure Storage

  1. Dodaj metodę StorageLinkedServiceDefinition do pliku Program.cs :

    static LinkedServiceResource StorageLinkedServiceDefinition(DataFactoryManagementClient client)
    {
       Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
       AzureStorageLinkedService storageLinkedService = new AzureStorageLinkedService
       {
           ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey)
       };
       Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
       LinkedServiceResource linkedService = new LinkedServiceResource(storageLinkedService, name:storageLinkedServiceName);
       return linkedService;
    }
    
  2. Dodaj następujący wiersz do Main metody, która tworzy połączoną usługę Azure Storage:

    client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
    

Aby uzyskać więcej informacji na temat obsługiwanych właściwości i szczegółów, zobacz Właściwości połączonej usługi.

Tworzenie zestawów danych

W tej sekcji utworzysz dwa zestawy danych— jeden dla źródła i jeden dla ujścia.

Tworzenie zestawu danych dla źródłowego obiektu blob platformy Azure

Dodaj metodę, która tworzy zestaw danych obiektów blob platformy Azure. Aby uzyskać więcej informacji na temat obsługiwanych właściwości i szczegółów, zobacz Właściwości zestawu danych obiektów blob platformy Azure.

Dodaj metodę SourceBlobDatasetDefinition do pliku Program.cs :

static DatasetResource SourceBlobDatasetDefinition(DataFactoryManagementClient client)
{
    Console.WriteLine("Creating dataset " + blobSourceDatasetName + "...");
    AzureBlobDataset blobDataset = new AzureBlobDataset
    {
        FolderPath = new Expression { Value = "@pipeline().parameters.sourceBlobContainer" },
        FileName = inputBlobName,
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = storageLinkedServiceName
        }
    };
    Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
    DatasetResource dataset = new DatasetResource(blobDataset, name:blobSourceDatasetName);
    return dataset;
}

Należy zdefiniować zestaw danych reprezentujący źródło danych w obiekcie blob platformy Azure. Ten zestaw danych obiektów blob odnosi się do połączonej usługi Azure Storage obsługiwanej w poprzednim kroku. Zestaw danych obiektów blob opisuje lokalizację obiektu blob do skopiowania z: FolderPath i FileName.

Zwróć uwagę na użycie parametrów dla folderuPath. sourceBlobContainer to nazwa parametru, a wyrażenie jest zastępowane wartościami przekazanymi w przebiegu potoku. Składnia umożliwiająca zdefiniowanie parametrów: @pipeline().parameters.<parameterName>

Tworzenie zestawu danych dla ujścia obiektu blob platformy Azure

  1. Dodaj metodę SourceBlobDatasetDefinition do pliku Program.cs :

    static DatasetResource SinkBlobDatasetDefinition(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating dataset " + blobSinkDatasetName + "...");
        AzureBlobDataset blobDataset = new AzureBlobDataset
        {
            FolderPath = new Expression { Value = "@pipeline().parameters.sinkBlobContainer" },
            LinkedServiceName = new LinkedServiceReference
            {
                ReferenceName = storageLinkedServiceName
            }
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
        DatasetResource dataset = new DatasetResource(blobDataset, name: blobSinkDatasetName);
        return dataset;
    }
    
  2. Dodaj następujący kod do Main metody, która tworzy zestawy danych źródła obiektów blob platformy Azure i ujścia.

    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
    
    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
    

Tworzenie klasy języka C#: EmailRequest

W projekcie języka C# utwórz klasę o nazwie EmailRequest. Ta klasa definiuje właściwości, które potok wysyła w żądaniu treści podczas wysyłania wiadomości e-mail. W tym samouczku potok wysyła cztery właściwości z potoku do wiadomości e-mail:

  • Wiadomość. Treść wiadomości e-mail. W przypadku pomyślnej kopii ta właściwość zawiera ilość zapisanych danych. W przypadku nieudanej kopii ta właściwość zawiera szczegóły błędu.
  • Nazwa fabryki danych. Nazwa fabryki danych.
  • Nazwa potoku. Nazwa potoku.
  • Odbiornik. Parametr, który przechodzi przez. Ta właściwość określa odbiorcę wiadomości e-mail.
    class EmailRequest
    {
        [Newtonsoft.Json.JsonProperty(PropertyName = "message")]
        public string message;

        [Newtonsoft.Json.JsonProperty(PropertyName = "dataFactoryName")]
        public string dataFactoryName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "pipelineName")]
        public string pipelineName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "receiver")]
        public string receiver;

        public EmailRequest(string input, string df, string pipeline, string receiverName)
        {
            message = input;
            dataFactoryName = df;
            pipelineName = pipeline;
            receiver = receiverName;
        }
    }

Tworzenie punktów końcowych przepływu pracy poczty e-mail

Aby wyzwolić wysyłanie wiadomości e-mail, użyjesz usługi Azure Logic Apps do zdefiniowania przepływu pracy. Aby uzyskać więcej informacji, zobacz Tworzenie przykładowego przepływu pracy aplikacji logiki zużycie.

Przepływ pracy wiadomości e-mail z informacją o powodzeniu

W witrynie Azure Portal utwórz przepływ pracy aplikacji logiki o nazwie CopySuccessEmail. Dodaj wyzwalacz Żądania o nazwie Po odebraniu żądania HTTP. W wyzwalaczu Żądanie wypełnij pole schematu JSON Treść żądania następującym kodem JSON:

{
    "properties": {
        "dataFactoryName": {
            "type": "string"
        },
        "message": {
            "type": "string"
        },
        "pipelineName": {
            "type": "string"
        },
        "receiver": {
            "type": "string"
        }
    },
    "type": "object"
}

Przepływ pracy wygląda podobnie do następującego przykładu:

Przepływ pracy wiadomości e-mail z informacją o powodzeniu

Ta zawartość JSON jest zgodna z klasą EmailRequest utworzoną w poprzedniej sekcji.

Dodaj akcję usługi Office 365 Outlook o nazwie Wyślij wiadomość e-mail. W przypadku tej akcji dostosuj sposób formatowania wiadomości e-mail przy użyciu właściwości przekazanych w schemacie JSON treści żądania. Oto przykład:

Projektant przepływu pracy z akcją o nazwie Wyślij wiadomość e-mail.

Po zapisaniu przepływu pracy skopiuj i zapisz wartość adresu URL HTTP POST z wyzwalacza.

Przepływ pracy wiadomości e-mail z informacją o niepowodzeniu

Sklonuj CopySuccessEmail przepływ pracy aplikacji logiki do nowego przepływu pracy o nazwie CopyFailEmail. W wyzwalaczu Żądanie schemat JSON treści żądania jest taki sam. Zmień format wiadomości e-mail, na przykład element Subject, aby przekształcić wiadomość e-mail w wiadomość z informacją o niepowodzeniu. Oto przykład:

Projektant przepływu pracy i przepływ pracy wiadomości e-mail z niepowodzeniem.

Po zapisaniu przepływu pracy skopiuj i zapisz wartość adresu URL HTTP POST z wyzwalacza.

Teraz powinny istnieć dwa adresy URL przepływu pracy, takie jak następujące przykłady:

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

Tworzenie potoku

Wróć do projektu w programie Visual Studio. Teraz dodamy kod, który tworzy potok z działaniem kopiowania i DependsOn właściwością. W tym samouczku potok zawiera jedno działanie, działanie kopiowania, które przyjmuje zestaw danych obiektów blob jako źródło i inny zestaw danych obiektów blob jako ujście. Jeśli działanie kopiowania zakończy się pomyślnie lub zakończy się niepowodzeniem, wywołuje różne zadania poczty e-mail.

W tym potoku użyto następujących funkcji:

  • Parametry
  • Działanie w sieci Web
  • Zależność działania
  • Używanie danych wyjściowych z działania jako danych wejściowych do innego działania
  1. Dodaj tę metodę do projektu. Poniższe sekcje zawierają bardziej szczegółowe informacje.

    static PipelineResource PipelineDefinition(DataFactoryManagementClient client)
            {
                Console.WriteLine("Creating pipeline " + pipelineName + "...");
                PipelineResource resource = new PipelineResource
                {
                    Parameters = new Dictionary<string, ParameterSpecification>
                    {
                        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    
                    },
                    Activities = new List<Activity>
                    {
                        new CopyActivity
                        {
                            Name = copyBlobActivity,
                            Inputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSourceDatasetName
                                }
                            },
                            Outputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSinkDatasetName
                                }
                            },
                            Source = new BlobSource { },
                            Sink = new BlobSink { }
                        },
                        new WebActivity
                        {
                            Name = sendSuccessEmailActivity,
                            Method = WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/00000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Succeeded" }
                                }
                            }
                        },
                        new WebActivity
                        {
                            Name = sendFailEmailActivity,
                            Method =WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').error.message}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Failed" }
                                }
                            }
                        }
                    }
                };
                Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
                return resource;
            }
    
  2. Dodaj następujący wiersz do Main metody, która tworzy potok:

    client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
    

Parametry

Pierwsza sekcja kodu potoku definiuje parametry.

  • sourceBlobContainer. Źródłowy zestaw danych obiektów blob używa tego parametru w potoku.
  • sinkBlobContainer. Zestaw danych obiektów blob ujścia używa tego parametru w potoku.
  • receiver. Dwa działania sieci Web w potoku, które wysyłają wiadomości e-mail o powodzeniu lub niepowodzeniu do odbiorcy, używają tego parametru.
Parameters = new Dictionary<string, ParameterSpecification>
    {
        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    },

Działanie w sieci Web

Działanie internetowe umożliwia wywołanie dowolnego punktu końcowego REST. Aby uzyskać więcej informacji na temat działania, zobacz Działanie internetowe w usłudze Azure Data Factory. Ten potok używa działania internetowego do wywoływania przepływu pracy poczty e-mail usługi Logic Apps. Utworzysz dwa działania internetowe: jedno, które wywołuje CopySuccessEmail przepływ pracy i jedno, które wywołuje element CopyFailWorkFlow.

        new WebActivity
        {
            Name = sendCopyEmailActivity,
            Method = WebActivityMethod.POST,
            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/12345",
            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
            DependsOn = new List<ActivityDependency>
            {
                new ActivityDependency
                {
                    Activity = copyBlobActivity,
                    DependencyConditions = new List<String> { "Succeeded" }
                }
            }
        }

Url W właściwości wklej punkty końcowe adresu URL HTTP POST z przepływów pracy usługi Logic Apps. Body We właściwości przekaż wystąpienie EmailRequest klasy . Żądanie wiadomości e-mail zawiera następujące właściwości:

  • Wiadomość. Przekazuje wartość @{activity('CopyBlobtoBlob').output.dataWritten. Uzyskuje dostęp do właściwości poprzedniego działania kopiowania i przekazuje wartość dataWritten. W przypadku wiadomości dotyczącej niepowodzenia przekaż dane wyjściowe błędu zamiast elementu @{activity('CopyBlobtoBlob').error.message.
  • Nazwa fabryki danych. Przekazuje wartość Tej zmiennej @{pipeline().DataFactory} systemowej umożliwia dostęp do odpowiedniej nazwy fabryki danych. Aby uzyskać listę zmiennych systemowych, zobacz Zmienne systemowe.
  • Nazwa potoku. Przekazuje wartość @{pipeline().Pipeline}. Ta zmienna systemowa umożliwia dostęp do odpowiedniej nazwy potoku.
  • Odbiornik. Przekazuje wartość "@pipeline().parameters.receiver". Uzyskuje dostęp do parametrów potoku.

Ten kod tworzy nową zależność działania, która zależy od poprzedniego działania kopiowania.

Tworzenie uruchomienia potoku

Dodaj następujący kod do Main metody, która wyzwala uruchomienie potoku.

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Klasa metody Main

Main Ostateczna metoda powinna wyglądać następująco.

// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };

Factory df = CreateOrUpdateDataFactory(client);

client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));

Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Skompiluj i uruchom program, aby wyzwolić uruchomienie potoku.

Monitorowanie uruchomienia potoku

  1. Dodaj do metody Main następujący kod:

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    

    Ten kod stale sprawdza stan przebiegu do momentu zakończenia kopiowania danych.

  2. Dodaj następujący kod do Main metody, która pobiera szczegóły przebiegu działania kopiowania, na przykład rozmiar odczyt/zapis danych:

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun(
    resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)).ToList();
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(activityRuns.First().Output);
        //SaveToJson(SafeJsonConvert.SerializeObject(activityRuns.First().Output, client.SerializationSettings), "ActivityRunResult.json", folderForJsons);
    }
    else
        Console.WriteLine(activityRuns.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

Uruchamianie kodu

Skompiluj i uruchom aplikację, a następnie zweryfikuj wykonywanie potoku.

Aplikacja wyświetla postęp tworzenia fabryki danych, połączonej usługi, zestawów danych, potoku i uruchomienia potoku. Następnie sprawdza stan uruchomienia potoku. Poczekaj na wyświetlenie szczegółów uruchomienia działania kopiowania z rozmiarem odczytanych/zapisanych danych. Następnie użyj narzędzi, takich jak Eksplorator usługi Azure Storage, aby sprawdzić, czy obiekt blob został skopiowany do parametru outputBlobPath z parametru inputBlobPath, jak określono w zmiennych.

Dane wyjściowe powinny przypominać następujący przykład:

Creating data factory DFTutorialTest...
{
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "type": "AzureStorage",
  "typeProperties": {
    "connectionString": "DefaultEndpointsProtocol=https;AccountName=***;AccountKey=***"
  }
}
Creating dataset SourceStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sourceBlobContainer"
    },
    "fileName": "input.txt"
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating dataset SinkStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sinkBlobContainer"
    }
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating pipeline Adfv2TutorialBranchCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SourceStorageDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SinkStorageDataset"
          }
        ],
        "name": "CopyBlobtoBlob"
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').output.dataWritten}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendSuccessEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Succeeded"
            ]
          }
        ]
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').error.message}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendFailEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Failed"
            ]
          }
        ]
      }
    ],
    "parameters": {
      "sourceBlobContainer": {
        "type": "String"
      },
      "sinkBlobContainer": {
        "type": "String"
      },
      "receiver": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 00000000-0000-0000-0000-0000000000000
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 20,
  "dataWritten": 20,
  "copyDuration": 4,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
}
{}

Press any key to exit...

W tym samouczku wykonaliśmy następujące zadania:

  • Tworzenie fabryki danych
  • Tworzenie połączonej usługi Azure Storage
  • Tworzenie zestawu danych obiektów blob platformy Azure
  • Tworzenie potoku zawierającego działanie kopiowania i działanie internetowe
  • Wysyłanie danych wyjściowych działań do kolejnych działań
  • Używanie przekazywania parametrów i zmiennych systemowych
  • Uruchamianie potoku
  • Monitorowanie uruchomień działań i potoku

Teraz możesz przejść do sekcji Pojęcia, aby uzyskać więcej informacji na temat usługi Azure Data Factory.