Actividades de bifurcación y encadenamiento en una canalización de Data Factory
SE APLICA A: Azure Data Factory Azure Synapse Analytics
Sugerencia
Pruebe Data Factory en Microsoft Fabric, una solución de análisis todo en uno para empresas. Microsoft Fabric abarca todo, desde el movimiento de datos hasta la ciencia de datos, el análisis en tiempo real, la inteligencia empresarial y los informes. Obtenga información sobre cómo iniciar una nueva evaluación gratuita.
En este tutorial creará una canalización de Data Factory que muestra algunas de las características del flujo de control. Esta canalización copia de un contenedor en Azure Blob Storage a otro contenedor en la misma cuenta de almacenamiento. Si la actividad de copia se realiza correctamente, la canalización envía los detalles de la operación de copia correcta en un correo electrónico. Esa información puede incluir la cantidad de datos escritos. Si se produce un error en la actividad de copia, la canalización envía los detalles del error de copia, como el mensaje de error, en un correo electrónico. A lo largo del tutorial, verá cómo pasar parámetros.
En este gráfico se proporciona información general sobre el escenario:
En el tutorial se muestra cómo realizar las siguientes tareas:
- Crear una factoría de datos
- Creación de un servicio vinculado de Azure Storage
- Creación de un conjunto de datos del blob de Azure
- Creación de una canalización que contiene una actividad de copia y una actividad web
- Envío de los resultados de las actividades en actividades subsiguientes
- Uso del paso de parámetros y de las variables del sistema
- Inicio de la ejecución de una canalización
- Supervisión de las ejecuciones de canalización y actividad
En este tutorial se usa SDK de .NET. Puede usar otros mecanismos para interactuar con Azure Data Factory. Si quiere acceder a inicios rápidos de Data Factory, consulte Inicios rápidos en 5 minutos.
Si no tiene una suscripción a Azure, cree una cuenta gratuita antes de empezar.
Requisitos previos
- Cuenta de Azure Storage. Usará el almacenamiento de blobs como almacén de datos de origen. Si no dispone de una, consulte Crear una cuenta de almacenamiento.
- Explorador de Azure Storage Para instalar esta herramienta, consulte Explorador de Azure Storage.
- Azure SQL Database. Usará la base de datos como un almacén de datos receptor. Si no tiene una base de datos en Azure SQL Database, consulte el artículo Creación de una base de datos en Azure SQL Database.
- Visual Studio. En este artículo se utiliza Visual Studio 2019.
- SDK de Azure para .NET. Descargue e instale el SDK de Azure para .NET.
Para obtener una lista de las regiones de Azure en las que Data Factory está disponible actualmente, consulte Productos disponibles por región. Los almacenes de datos y los procesos pueden estar en otras regiones. Los almacenes incluyen Azure Storage y Azure SQL Database. Los procesos incluyen HDInsight, que se usa en Data Factory.
Crea una aplicación como se describe en Crear una aplicación de Microsoft Entra. Siga las instrucciones del mismo artículo para asignar la aplicación al rol Colaborador. Necesitará varios valores para partes posteriores de este tutorial, como Id. de aplicación (cliente) e Id. de directorio (inquilino) .
Creación de una tabla de blobs
Abra un editor de texto. Copie el siguiente texto y guárdelo localmente como input.txt.
Ethel|Berg Tamika|Walsh
Abra el Explorador de Azure Storage. Expanda la cuenta de almacenamiento. Haga clic con el botón derecho en Contenedores de blob y seleccione Crear contenedor de blobs.
Asigne al nuevo contenedor el nombre adfv2branch y seleccione Cargar para agregar el archivo input.txt al contenedor.
Creación de un proyecto de Visual Studio
Cree una aplicación de consola .NET de C#:
- Inicie Visual Studio y seleccione Crear un proyecto.
- En Crear un proyecto, elija Aplicación de consola (.NET Framework) para C# y seleccione Siguiente.
- Asigne al proyecto el nombre ADFv2BranchTutorial.
- Seleccione Versión de .NET 4.5.2 o superior y, a continuación, seleccione Crear.
Instalación de paquetes NuGet
Seleccione Herramientas>Administrador de paquetes NuGet>Consola del Administrador de paquetes.
En la Consola del Administrador de paquetes, ejecute los comandos siguientes para instalar los paquetes. Consulte el paquete NuGet Microsoft.Azure.Management.DataFactory para ver los detalles.
Install-Package Microsoft.Azure.Management.DataFactory Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
Creación de un cliente de factoría de datos
Abra el archivo Program.cs y agregue las siguientes instrucciones:
using System; using System.Collections.Generic; using System.Linq; using Microsoft.Rest; using Microsoft.Azure.Management.ResourceManager; using Microsoft.Azure.Management.DataFactory; using Microsoft.Azure.Management.DataFactory.Models; using Microsoft.IdentityModel.Clients.ActiveDirectory;
Agregue estas variables estáticas a la clase
Program
. Reemplace los marcadores de posición con sus propios valores.// Set variables static string tenantID = "<tenant ID>"; static string applicationId = "<application ID>"; static string authenticationKey = "<Authentication key for your application>"; static string subscriptionId = "<Azure subscription ID>"; static string resourceGroup = "<Azure resource group name>"; static string region = "East US"; static string dataFactoryName = "<Data factory name>"; // Specify the source Azure Blob information static string storageAccount = "<Azure Storage account name>"; static string storageKey = "<Azure Storage account key>"; // confirm that you have the input.txt file placed in th input folder of the adfv2branch container. static string inputBlobPath = "adfv2branch/input"; static string inputBlobName = "input.txt"; static string outputBlobPath = "adfv2branch/output"; static string emailReceiver = "<specify email address of the receiver>"; static string storageLinkedServiceName = "AzureStorageLinkedService"; static string blobSourceDatasetName = "SourceStorageDataset"; static string blobSinkDatasetName = "SinkStorageDataset"; static string pipelineName = "Adfv2TutorialBranchCopy"; static string copyBlobActivity = "CopyBlobtoBlob"; static string sendFailEmailActivity = "SendFailEmailActivity"; static string sendSuccessEmailActivity = "SendSuccessEmailActivity";
Agregue el siguiente código al método
Main
. Este código crea una instancia de la claseDataFactoryManagementClient
. Este objeto se usa después para crear la factoría de datos, el servicio vinculado, los conjuntos de datos y la canalización. También se puede usar para supervisar los detalles de ejecución de la canalización.// Authenticate and create a data factory management client var context = new AuthenticationContext("https://login.windows.net/" + tenantID); ClientCredential cc = new ClientCredential(applicationId, authenticationKey); AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result; ServiceClientCredentials cred = new TokenCredentials(result.AccessToken); var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
Crear una factoría de datos
Agregue un método
CreateOrUpdateDataFactory
al archivo Program.cs:static Factory CreateOrUpdateDataFactory(DataFactoryManagementClient client) { Console.WriteLine("Creating data factory " + dataFactoryName + "..."); Factory resource = new Factory { Location = region }; Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings)); Factory response; { response = client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, resource); } while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation") { System.Threading.Thread.Sleep(1000); } return response; }
Agregue la siguiente línea al método
Main
que crea una factoría de datos:Factory df = CreateOrUpdateDataFactory(client);
Creación de un servicio vinculado de Azure Storage
Agregue un método
StorageLinkedServiceDefinition
al archivo Program.cs:static LinkedServiceResource StorageLinkedServiceDefinition(DataFactoryManagementClient client) { Console.WriteLine("Creating linked service " + storageLinkedServiceName + "..."); AzureStorageLinkedService storageLinkedService = new AzureStorageLinkedService { ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey) }; Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings)); LinkedServiceResource linkedService = new LinkedServiceResource(storageLinkedService, name:storageLinkedServiceName); return linkedService; }
Agregue la siguiente línea al método
Main
que crea un servicio vinculado de Azure Storage:client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
Para más información acerca de las propiedades admitidas y sus detalles, consulte Propiedades del servicio vinculado.
Creación de conjuntos de datos
En esta sección se crean dos conjuntos de datos: uno para el origen y otro para el receptor.
Creación de un conjunto de datos para un blob de Azure de origen
Agregue un método que cree un conjunto de datos de blob de Azure. Para más información acerca de las propiedades admitidas y sus detalles, consulte Propiedades del conjunto de datos de blob de Azure.
Agregue un método SourceBlobDatasetDefinition
al archivo Program.cs:
static DatasetResource SourceBlobDatasetDefinition(DataFactoryManagementClient client)
{
Console.WriteLine("Creating dataset " + blobSourceDatasetName + "...");
AzureBlobDataset blobDataset = new AzureBlobDataset
{
FolderPath = new Expression { Value = "@pipeline().parameters.sourceBlobContainer" },
FileName = inputBlobName,
LinkedServiceName = new LinkedServiceReference
{
ReferenceName = storageLinkedServiceName
}
};
Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
DatasetResource dataset = new DatasetResource(blobDataset, name:blobSourceDatasetName);
return dataset;
}
Se define un conjunto de datos que representa los datos de origen del blob de Azure. Este conjunto de datos de blob hace referencia al servicio vinculado de Azure Storage admitido del paso anterior. El conjunto de datos de blob describe la ubicación del blob desde el que se va a copiar: FolderPath y FileName.
Tenga en cuenta el uso de los parámetros en FolderPath. sourceBlobContainer
es el nombre del parámetro; la expresión se reemplaza con los valores pasados en la ejecución de la canalización. La sintaxis para definir los parámetros es @pipeline().parameters.<parameterName>
.
Creación de un conjunto de datos para un blob de Azure receptor
Agregue un método
SourceBlobDatasetDefinition
al archivo Program.cs:static DatasetResource SinkBlobDatasetDefinition(DataFactoryManagementClient client) { Console.WriteLine("Creating dataset " + blobSinkDatasetName + "..."); AzureBlobDataset blobDataset = new AzureBlobDataset { FolderPath = new Expression { Value = "@pipeline().parameters.sinkBlobContainer" }, LinkedServiceName = new LinkedServiceReference { ReferenceName = storageLinkedServiceName } }; Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings)); DatasetResource dataset = new DatasetResource(blobDataset, name: blobSinkDatasetName); return dataset; }
Agregue el siguiente código al método
Main
que crea los conjuntos de datos de blob de Azure de origen y recepción.client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client)); client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
Cree una clase de C#: EmailRequest
En el proyecto de C#, cree una clase denominada EmailRequest
. Esta clase define qué propiedades envía la canalización en la solicitud del cuerpo al enviar un correo electrónico. En este tutorial, la canalización envía cuatro propiedades desde la canalización al correo electrónico:
- Message. Cuerpo del correo electrónico. Si la copia se realiza correctamente, esta propiedad contiene la cantidad de datos escritos. Si la copia no se realiza correctamente, esta propiedad contiene los detalles del error.
- Nombre de factoría de datos. El nombre de la factoría de datos.
- Nombre de canalización. Nombre de la canalización.
- Receptor. Parámetro que se pasa. Esta propiedad especifica el destinatario del correo electrónico.
class EmailRequest
{
[Newtonsoft.Json.JsonProperty(PropertyName = "message")]
public string message;
[Newtonsoft.Json.JsonProperty(PropertyName = "dataFactoryName")]
public string dataFactoryName;
[Newtonsoft.Json.JsonProperty(PropertyName = "pipelineName")]
public string pipelineName;
[Newtonsoft.Json.JsonProperty(PropertyName = "receiver")]
public string receiver;
public EmailRequest(string input, string df, string pipeline, string receiverName)
{
message = input;
dataFactoryName = df;
pipelineName = pipeline;
receiver = receiverName;
}
}
Creación de puntos de conexión de flujo de trabajo del correo electrónico
Para desencadenar el envío de un correo electrónico, use Azure Logic Apps a fin de definir el flujo de trabajo. Para obtener más información, consulte Creación de un ejemplo de flujo de trabajo de aplicación lógica de Consumo.
Flujo de trabajo del correo electrónico de operación correcta
En Azure Portal, cree un flujo de trabajo de aplicación lógica denominado CopySuccessEmail
. Agregue el desencadenador de solicitud denominado Cuando se recibe una solicitud HTTP. En el desencadenador de solicitud, rellene el cuadro Esquema JSON del cuerpo de la solicitud con el siguiente JSON:
{
"properties": {
"dataFactoryName": {
"type": "string"
},
"message": {
"type": "string"
},
"pipelineName": {
"type": "string"
},
"receiver": {
"type": "string"
}
},
"type": "object"
}
El flujo de trabajo tiene un aspecto similar al del ejemplo siguiente:
Este contenido JSON se corresponde con la clase EmailRequest
creada en la sección anterior.
Agregue la acción de Outlook de Office 365 denominada Enviar un correo electrónico. Para esta acción, personalice el formato del correo electrónico según su preferencia. Para ello, use las propiedades que se pasan en el esquema JSON del Cuerpo de la solicitud. Este es un ejemplo:
Después de guardar el flujo de trabajo, copie y guarde el valor de Dirección URL de HTTP POST del desencadenador.
Flujo de trabajo del correo electrónico de operación incorrecta
Clone el flujo de trabajo de la aplicación lógica CopySuccessEmail
en un nuevo flujo de trabajo denominado CopyFailEmail
. En el desencadenador de solicitud, el esquema JSON del cuerpo de la solicitud es el mismo. Cambie el formato del correo electrónico, por ejemplo, la parte Subject
, para adaptarlo para que sea un correo electrónico de operación incorrecta. Este es un ejemplo:
Después de guardar el flujo de trabajo, copie y guarde el valor de Dirección URL de HTTP POST del desencadenador.
Ahora debería tener dos direcciones URL de flujo de trabajo, como en los ejemplos siguientes:
//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000
//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000
Crear una canalización
Vuelva al proyecto en Visual Studio. Ahora vamos a agregar el código que crea una canalización con una actividad de copia y la propiedad DependsOn
. En este tutorial, la canalización contiene una actividad, la actividad de copia, que toma el conjunto de datos del blob como origen y otro conjunto de datos del blob como receptor. Si la actividad de copia se realiza de forma correcta o incorrecta, llama a distintas tareas de correo electrónico.
En esta canalización, use las siguientes características:
- Parámetros
- Actividad web
- Dependencia de actividades
- Uso de la salida de una actividad como entrada de otra actividad
Agregue este método al proyecto. En las siguientes secciones se ofrecen más detalles.
static PipelineResource PipelineDefinition(DataFactoryManagementClient client) { Console.WriteLine("Creating pipeline " + pipelineName + "..."); PipelineResource resource = new PipelineResource { Parameters = new Dictionary<string, ParameterSpecification> { { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } }, { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } }, { "receiver", new ParameterSpecification { Type = ParameterType.String } } }, Activities = new List<Activity> { new CopyActivity { Name = copyBlobActivity, Inputs = new List<DatasetReference> { new DatasetReference { ReferenceName = blobSourceDatasetName } }, Outputs = new List<DatasetReference> { new DatasetReference { ReferenceName = blobSinkDatasetName } }, Source = new BlobSource { }, Sink = new BlobSink { } }, new WebActivity { Name = sendSuccessEmailActivity, Method = WebActivityMethod.POST, Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/00000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000000", Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"), DependsOn = new List<ActivityDependency> { new ActivityDependency { Activity = copyBlobActivity, DependencyConditions = new List<String> { "Succeeded" } } } }, new WebActivity { Name = sendFailEmailActivity, Method =WebActivityMethod.POST, Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000", Body = new EmailRequest("@{activity('CopyBlobtoBlob').error.message}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"), DependsOn = new List<ActivityDependency> { new ActivityDependency { Activity = copyBlobActivity, DependencyConditions = new List<String> { "Failed" } } } } } }; Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings)); return resource; }
Agregue la siguiente línea al método
Main
que crea la canalización:client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
Parámetros
La primera sección del código de la canalización define parámetros.
sourceBlobContainer
. El conjunto de datos de blob de origen consume este parámetro en la canalización.sinkBlobContainer
. El conjunto de datos de blob de receptor consume este parámetro en la canalización.receiver
. Las dos actividades web de la canalización que envían correos electrónicos de fin correcto o error al receptor usan este parámetro.
Parameters = new Dictionary<string, ParameterSpecification>
{
{ "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
{ "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
{ "receiver", new ParameterSpecification { Type = ParameterType.String } }
},
Actividad web
Una actividad web permite efectuar una llamada a cualquier punto de conexión REST. Para más información sobre la actividad, consulte Actividad web en Azure Data Factory. Esta canalización usa actividades web para llamar al flujo de trabajo de correo electrónico de Logic Apps. Se crean dos actividades web: una que llama al flujo de trabajo CopySuccessEmail
y otra que llama a CopyFailWorkFlow
.
new WebActivity
{
Name = sendCopyEmailActivity,
Method = WebActivityMethod.POST,
Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/12345",
Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
DependsOn = new List<ActivityDependency>
{
new ActivityDependency
{
Activity = copyBlobActivity,
DependencyConditions = new List<String> { "Succeeded" }
}
}
}
En la propiedad Url
, pegue los puntos de conexión de Dirección URL HTTP POST de los flujos de trabajo de Logic Apps. En la propiedad Body
, pase una instancia de la clase EmailRequest
. La solicitud de correo electrónico contiene las siguientes propiedades:
- Message. Pasa el valor de
@{activity('CopyBlobtoBlob').output.dataWritten
. Accede a una propiedad de la actividad de copia anterior y pasa el valor dedataWritten
. En caso de error, pasa la salida de error en lugar de@{activity('CopyBlobtoBlob').error.message
. - Nombre de factoría de datos. Pasa el valor de
@{pipeline().DataFactory}
. Esta variable del sistema permite acceder al nombre de la factoría de datos correspondiente. Para obtener una lista de las variables del sistema, consulte Variables del sistema. - Nombre de canalización. Pasa el valor de
@{pipeline().Pipeline}
. Esta variable del sistema permite acceder al nombre de la canalización correspondiente. - Receptor. Pasa el valor de
"@pipeline().parameters.receiver"
. Accede a los parámetros de la canalización.
Este código crea una nueva dependencia de actividad que depende de la actividad de copia anterior.
Creación de una ejecución de canalización
Agregue el siguiente código al método Main
que desencadena una ejecución de canalización.
// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
{ "sourceBlobContainer", inputBlobPath },
{ "sinkBlobContainer", outputBlobPath },
{ "receiver", emailReceiver }
};
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
Clase Main
El método Main
final debe tener el aspecto siguiente.
// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
Factory df = CreateOrUpdateDataFactory(client);
client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
{ "sourceBlobContainer", inputBlobPath },
{ "sinkBlobContainer", outputBlobPath },
{ "receiver", emailReceiver }
};
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
Compile y ejecute su programa para desencadenar una ejecución de canalización.
Supervisar una ejecución de canalización
Agregue el siguiente código al método
Main
:// Monitor the pipeline run Console.WriteLine("Checking pipeline run status..."); PipelineRun pipelineRun; while (true) { pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId); Console.WriteLine("Status: " + pipelineRun.Status); if (pipelineRun.Status == "InProgress") System.Threading.Thread.Sleep(15000); else break; }
Este código comprueba continuamente el estado de la ejecución hasta que termina de copiar los datos.
Agregue el siguiente código al método
Main
que recupera detalles de la ejecución de la actividad de copia, como el tamaño de los datos leídos o escritos:// Check the copy activity run details Console.WriteLine("Checking copy activity run details..."); List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun( resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)).ToList(); if (pipelineRun.Status == "Succeeded") { Console.WriteLine(activityRuns.First().Output); //SaveToJson(SafeJsonConvert.SerializeObject(activityRuns.First().Output, client.SerializationSettings), "ActivityRunResult.json", folderForJsons); } else Console.WriteLine(activityRuns.First().Error); Console.WriteLine("\nPress any key to exit..."); Console.ReadKey();
Ejecución del código
Compile e inicie la aplicación y, a continuación, compruebe la ejecución de la canalización.
La aplicación muestra el progreso de la creación de la factoría de datos, el servicio vinculado, los conjuntos de datos, la canalización y la ejecución de la canalización. A continuación, comprueba el estado de la ejecución de canalización. Espere hasta que vea los detalles de ejecución de actividad de copia con el tamaño de los datos leídos/escritos. A continuación, use herramientas como el Explorador de Azure Storage para comprobar que los blobs se copian a outputBlobPath desde inputBlobPath, como se especificó en las variables.
La salida debe ser similar al siguiente ejemplo:
Creating data factory DFTutorialTest...
{
"location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=***;AccountKey=***"
}
}
Creating dataset SourceStorageDataset...
{
"type": "AzureBlob",
"typeProperties": {
"folderPath": {
"type": "Expression",
"value": "@pipeline().parameters.sourceBlobContainer"
},
"fileName": "input.txt"
},
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureStorageLinkedService"
}
}
Creating dataset SinkStorageDataset...
{
"type": "AzureBlob",
"typeProperties": {
"folderPath": {
"type": "Expression",
"value": "@pipeline().parameters.sinkBlobContainer"
}
},
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureStorageLinkedService"
}
}
Creating pipeline Adfv2TutorialBranchCopy...
{
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
}
},
"inputs": [
{
"type": "DatasetReference",
"referenceName": "SourceStorageDataset"
}
],
"outputs": [
{
"type": "DatasetReference",
"referenceName": "SinkStorageDataset"
}
],
"name": "CopyBlobtoBlob"
},
{
"type": "WebActivity",
"typeProperties": {
"method": "POST",
"url": "https://xxxx.eastus.logic.azure.com:443/workflows/... ",
"body": {
"message": "@{activity('CopyBlobtoBlob').output.dataWritten}",
"dataFactoryName": "@{pipeline().DataFactory}",
"pipelineName": "@{pipeline().Pipeline}",
"receiver": "@pipeline().parameters.receiver"
}
},
"name": "SendSuccessEmailActivity",
"dependsOn": [
{
"activity": "CopyBlobtoBlob",
"dependencyConditions": [
"Succeeded"
]
}
]
},
{
"type": "WebActivity",
"typeProperties": {
"method": "POST",
"url": "https://xxx.eastus.logic.azure.com:443/workflows/... ",
"body": {
"message": "@{activity('CopyBlobtoBlob').error.message}",
"dataFactoryName": "@{pipeline().DataFactory}",
"pipelineName": "@{pipeline().Pipeline}",
"receiver": "@pipeline().parameters.receiver"
}
},
"name": "SendFailEmailActivity",
"dependsOn": [
{
"activity": "CopyBlobtoBlob",
"dependencyConditions": [
"Failed"
]
}
]
}
],
"parameters": {
"sourceBlobContainer": {
"type": "String"
},
"sinkBlobContainer": {
"type": "String"
},
"receiver": {
"type": "String"
}
}
}
}
Creating pipeline run...
Pipeline run ID: 00000000-0000-0000-0000-0000000000000
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
"dataRead": 20,
"dataWritten": 20,
"copyDuration": 4,
"throughput": 0.01,
"errors": [],
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
}
{}
Press any key to exit...
Contenido relacionado
En este tutorial ha realizado las tareas siguientes:
- Crear una factoría de datos
- Creación de un servicio vinculado de Azure Storage
- Creación de un conjunto de datos del blob de Azure
- Creación de una canalización que contiene una actividad de copia y una actividad web
- Envío de los resultados de las actividades en actividades subsiguientes
- Uso del paso de parámetros y de las variables del sistema
- Inicio de la ejecución de una canalización
- Supervisión de las ejecuciones de canalización y actividad
Ahora puede continuar en la sección de conceptos para más información sobre Azure Data Factory.