创建数据工厂活动和管道
Azure 数据工厂中的活动定义将对数据执行的操作,这些操作分为以下三类:
- 数据移动活动
- 数据转换活动
- 控制活动
数据移动活动
数据移动活动只是将数据从一个数据存储移动到另一个数据存储。 你可以使用复制活动或 JSON 来执行数据移动活动。 支持各种类别的数据存储作为源和接收器。 此列表不断增加,你可以在此处找到最新信息。
数据转换活动
可以使用映射数据流在 Azure 数据工厂的创作工具中在本地执行数据转换活动。 此外,还可以调用计算资源以通过转换来更改或增强数据,或执行数据分析。 其中包括计算技术,例如 Azure Databricks、Azure Batch、SQL 数据库和 Azure Synapse Analytics、机器学习服务、Azure 虚拟机和 HDInsight。 可以使用存储在目录中的任何现有 SQL Server Integration Services (SSIS) 包在 Azure 中执行
此列表始终不断发展,你可以在此处获取最新信息。
控制活动
以图形方式创作 ADF 解决方案时,可以使用设计中的控制流来编排管道活动,包括将活动按顺序链接起来、设置分支、在管道级别定义形参,以及在按需或通过触发器调用管道时传递实参。 当前的功能包括:
控制活动 | 描述 |
---|---|
执行管道活动 | Execute Pipeline 活动允许数据工厂管道调用另一个管道。 |
ForEach 活动 | ForEach 活动在管道中定义重复的控制流。 此活动用于循环访问集合,并在循环中执行指定的活动。 此活动的循环实现类似于采用编程语言的 Foreach 循环结构。 |
Web 活动 | Web 活动可用于从数据工厂管道调用自定义的 REST 终结点。 可以传递数据集和链接服务以供活动使用和访问。 |
查找活动 | 查找活动可用于从任何外部源读取或查找记录/表名称/值。 此输出可进一步由后续活动引用。 |
获取元数据活动 | GetMetadata 活动可用于检索 Azure 数据工厂中的任何数据的元数据。 |
Until 活动 | 实现类似于采用编程语言的 Do-Until 循环结构的 Do-Until 循环。 它在循环中将执行一组活动,直到与活动相关联的条件的计算结果为 true。 你可以在数据工厂中为 Until 活动指定超时值。 |
If Condition 活动 | If Condition 可用于基于计算结果为 true 或 false 的条件进行分支。 If Condition 活动可提供 if 语句在编程语言中提供相同的功能。 当条件计算结果为 true 时,该活动会计算一组活动,当条件计算结果为 false 时,它会计算另一组活动。 |
Wait 活动 | 在管道中使用等待活动时,管道将等待一段指定的时间,然后继续执行后续活动。 |
可以在此处获取最新信息。
活动和管道
定义活动
使用 JSON 表示法时,活动部分可以在其中定义一个或多个活动。 有两种主要类型的活动:执行和控制活动。 执行(也称为计算)活动包括数据移动和数据转换活动。 它们具有以下顶级结构:
{
"name": "Execution Activity Name",
"description": "description",
"type": "<ActivityType>",
"typeProperties":
{
},
"linkedServiceName": "MyLinkedService",
"policy":
{
},
"dependsOn":
{
}
}
下表描述了上述 JSON 中的属性:
属性 | 描述 | 必需 |
---|---|---|
name | 活动的名称。 | 是 |
description | 描述活动的含义或用途的文本。 | 否 |
type | 定义活动的类型。 | 是 |
linkedServiceName | 活动使用的链接服务的名称。 | 是,适用于 HDInsight、机器学习批处理评分活动和存储过程活动 |
typeProperties | typeProperties 部分的属性取决于每个活动类型。 | 否 |
policy | 影响活动运行时行为的策略。 该属性包括超时和重试行为。 | 否 |
dependsOn | 该属性用于定义活动依赖项,以及后续活动对以前活动的依赖方式。 | 否 |
定义控制活动
数据工厂中的控件活动以 JSON 格式定义,如下所示:
{
"name": "Control Activity Name",
"description": "description",
"type": "<ActivityType>",
"typeProperties":
{
},
"dependsOn":
{
}
}
下表描述了上述 JSON 中的属性:
属性 | 描述 | 必需 |
---|---|---|
name | 活动的名称。 | 是 |
description | 描述活动的含义或用途的文本。 | 是 |
type | 定义活动的类型。 | 是 |
typeProperties | typeProperties 部分的属性取决于每个活动类型。 | 否 |
dependsOn | 该属性用于定义活动依赖项,以及后续活动对以前活动的依赖方式。 | 否 |
定义管道
下面介绍如何以 JSON 格式定义管道:
{
"name": "PipelineName",
"properties":
{
"description": "pipeline description",
"activities":
[
],
"parameters": {
}
}
}
下表描述了上述 JSON 中的属性:
属性 | 描述 | 必需 |
---|---|---|
name | 管道的名称。 | 是 |
description | 描述管道用途的文本。 | 否 |
活动 | activities 节中可定义有一个或多个活动。 | 是 |
parameters | 参数部分可在在管道内定义一个或多个参数,使你的管道能够灵活地重复使用。 | 否 |
示例
以下 JSON 定义名为“MyFirstPipeline”的管道,该管道包含 HDInsightHive 的一种活动类型,该活动类型将从存储在链接服务“StorageLinkedService”中的脚本名称“partitionweblogs.hql”调用查询,其输入名为“AzureBlobInput”,并且输出名为“AzureBlobOutput”。 它针对名为“HDInsightOnDemandLinkedService”的链接服务中定义的计算资源执行此操作
管道计划每月执行一次,如果失败,它将尝试执行 3 次。
{
"name": "MyFirstPipeline",
"properties": {
"description": "My first Azure Data Factory pipeline",
"activities": [
{
"type": "HDInsightHive",
"typeProperties": {
"scriptPath": "adfgetstarted/script/partitionweblogs.hql",
"scriptLinkedService": "StorageLinkedService",
"defines": {
"inputtable": "wasb://adfgetstarted@ctostorageaccount.blob.core.windows.net/inputdata",
"partitionedtable": "wasb://adfgetstarted@ctostorageaccount.blob.core.windows.net/partitioneddata"
}
},
"inputs": [
{
"name": "AzureBlobInput"
}
],
"outputs": [
{
"name": "AzureBlobOutput"
}
],
"policy": {
"concurrency": 1,
"retry": 3
},
"scheduler": {
"frequency": "Month",
"interval": 1
},
"name": "RunSampleHiveActivity",
"linkedServiceName": "HDInsightOnDemandLinkedService"
}
],
"start": "2017-04-01T00:00:00Z",
"end": "2017-04-02T00:00:00Z",
"isPaused": false,
"hubName": "ctogetstarteddf_hub",
"pipelineMode": "Scheduled"
}
}