你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
Azure 数据工厂和 Azure Synapse Analytics 中的数据流活动
适用于: Azure 数据工厂 Azure Synapse Analytics
提示
试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用!
使用数据流活动通过映射数据流来转换和移动数据。 如果你不熟悉数据流,请参阅映射数据流概述
使用 UI 创建数据流活动
若要在管道中使用数据流活动,请完成以下步骤:
在管道“活动”窗格中搜索“数据流”,然后将数据流活动拖动到管道画布上。
选择画布上的新“数据流”活动(如果尚未选择),及其“设置”选项卡,以编辑其详细信息。
当数据流用于更改的数据捕获时,检查点键用于设置检查点。 可以覆盖它。 数据流活动使用 guid 值作为检查点键,而不是“管道名称 + 活动名称”,以便即使有任何重命名操作,它也可以始终跟踪客户的更改数据捕获状态。 所有现有数据流活动都使用旧模式键实现后向兼容性。 通过已启用更改数据捕获的数据流资源发布新数据流活动后的检查点键选项如下所示。
选择现有数据流,或使用“新建”按钮创建一个新数据流。 选择所需的其他选项以完成配置。
语法
{
"name": "MyDataFlowActivity",
"type": "ExecuteDataFlow",
"typeProperties": {
"dataflow": {
"referenceName": "MyDataFlow",
"type": "DataFlowReference"
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine",
"runConcurrently": true,
"continueOnError": true,
"staging": {
"linkedService": {
"referenceName": "MyStagingLinkedService",
"type": "LinkedServiceReference"
},
"folderPath": "my-container/my-folder"
},
"integrationRuntime": {
"referenceName": "MyDataFlowIntegrationRuntime",
"type": "IntegrationRuntimeReference"
}
}
Type 属性
属性 | 说明 | 允许的值 | 必须 |
---|---|---|---|
数据流 | 对正在执行的数据流的引用 | DataFlowReference | 是 |
integrationRuntime | 运行数据流的计算环境。 如果没有指定,则使用自动解析 Azure Integration Runtime。 | IntegrationRuntimeReference | 否 |
compute.coreCount | Spark 群集中使用的内核数。 仅当使用自动解析 Azure Integration Runtime 时才能指定 | 8、16、32、48、80、144、272 | 否 |
compute.computeType | Spark 群集中使用的计算类型。 仅当使用自动解析 Azure Integration Runtime 时才能指定 | “常规” | 否 |
staging.linkedService | 如果使用的是 Azure Synapse Analytics 源或接收器,请指定用于 PolyBase 暂存的存储帐户。 如果 Azure 存储配置了 VNet 服务终结点,则必须在存储帐户上启用“允许受信任的 Microsoft 服务”并使用托管标识身份验证,详见将 VNet 服务终结点与 Azure 存储配合使用的影响。 另请了解 Azure Blob 和 Azure Data Lake Storage Gen2 的所需配置。 |
LinkedServiceReference | 仅当数据流读取或写入 Azure Synapse Analytics 时 |
staging.folderPath | 如果使用的是 Azure Synapse Analytics 源或接收器,则为 blob 存储帐户中用于 PolyBase 暂存的文件夹路径 | 字符串 | 仅当数据流读取或写入 Azure Synapse Analytics 时 |
traceLevel | 设置数据流活动执行的日志记录级别 | 精细、粗略、无 | 否 |
在运行时动态调整数据流大小
可以动态设置“核心计数”和“计算类型”属性,以在运行时调整传入源数据的大小。 使用管道活动(例如“查找”或“获取元数据”),以便查找源数据集数据的大小。 然后,在“数据流”活动属性中使用“添加动态内容”。 可以选择小、中或大计算大小。 (可选)选择“自定义”并手动配置计算类型和核心数。
数据流集成运行时
选择用于数据流活动执行的 Integration Runtime。 在默认情况下,服务会将自动解析 Azure Integration Runtime 与四个辅助角色核心配合使用。 此 IR 具有常规用途计算类型,并在与服务实例相同的区域中运行。 对于已运营化的管道,强烈建议创建自己的 Azure Integration Runtime,用于定义执行数据流活动所需的特定区域、计算类型、核心计数和 TTL。
对于大多数生产工作负载,建议至少使用一个具有 8+8(总计 16)个 v-Core 配置和 10 分钟生存时间的“常规用途”计算类型。 通过设置小型 TTL,Azure IR 可以维护一个热群集,它不会出现冷群集那种需要几分钟时间才能启动的情况。 有关详细信息,请参阅 Azure Integration Runtime。
重要
数据流活动中的 Integration Runtime 选择仅适用于管道的已触发执行。 如果使用数据流调试管道,则可在群集(在调试会话中指定)上运行。
PolyBase
如果使用 Azure Synapse Analytics 作为接收器或源,则必须为 PolyBase 批量加载选择一个暂存位置。 PolyBase 允许批量加载而不是逐行加载数据。 PolyBase 可大大减少 Azure Synapse Analytics 的加载时间。
检查点键
对数据流源使用更改捕获选项时,ADF 会自动维护和管理检查点。 默认检查点键是数据流名称和管道名称的哈希。 如果对源表或文件夹使用动态模式,可能要替代此哈希并在此处设置自己的检查点键值。
日志记录级别
如果不需要数据流活动的每个管道执行完整地记录所有详细的遥测日志,则可根据需要将日志记录级别设置为“基本”或“无”。 在“详细”模式(默认)下执行数据流时,要求此服务在数据转换期间完整地记录每个分区级别的活动。 该操作成本昂贵,因此仅在进行故障排除时启用“详细”模式可优化整体数据流和管道性能。 “基本”模式仅记录转换持续时间,而“无”模式仅提供持续时间摘要。
接收器属性
数据流中的分组功能既可以设置接收器的执行顺序,又可以使用相同的组号将接收器分组在一起。 为帮助管理组,可以要求此服务在同一组中运行接收器,以并行运行。 还可以将接收器组设置为继续,即使其中一个接收器遇到错误仍可这样设置。
数据流接收器的默认行为是以串行方式顺序执行每个接收器,并且在接收器中遇到错误时使数据流失败。 此外,除非进入数据流属性并为接收器设置不同的优先级,否则所有接收器均默认为同一组。
仅第一行
此选项仅适用于已为“输出到活动”启用了缓存接收器的数据流。 直接注入到管道中的数据流的输出限制为 2MB。 设置“仅第一行”有助于在将数据流活动输出直接注入到管道时限制来自数据流的数据输出。
将数据流参数化
参数化数据集
如果数据流使用参数化数据集,请在“设置”选项卡中设置参数值。
参数化数据流
如果数据流已参数化,请在“参数”选项卡中设置数据流参数的动态值。可以使用管道表达式语言或数据流表达式语言来分配动态或文字参数值。 有关详细信息,请参阅数据流参数。
参数化计算属性。
如果使用自动解析 Azure Integration Runtime 并为 compute.coreCount 和 compute.computeType 指定值,则可以将核心数或计算类型参数化。
数据流活动的管道调试
若要执行通过数据流活动运行的调试管道,则必须通过顶部栏中的“数据流调试”滑块来打开数据流调试模式。 借助调试模式,可以针对活动的 Spark 群集运行数据流。 有关详细信息,请参阅调试模式。
调试管道针对活动的调试群集运行,而不是针对“数据流”活动设置中指定的集成运行时环境运行。 在启动调试模式时,可以选择调试计算环境。
监视数据流活动
数据流活动具有特殊的监视体验,你可以在其中查看分区、暂存时间和数据世系信息。 通过“操作”下的眼镜图标打开监视窗格。 有关详细信息,请参阅监视数据流。
使用数据流活动会导致后续活动
数据流活动输出有关写入每个接收器的行数和从每个源读取的行数的指标。 这些结果将在活动运行结果的 output
部分中返回。 返回的指标采用以下 json 格式。
{
"runStatus": {
"metrics": {
"<your sink name1>": {
"rowsWritten": <number of rows written>,
"sinkProcessingTime": <sink processing time in ms>,
"sources": {
"<your source name1>": {
"rowsRead": <number of rows read>
},
"<your source name2>": {
"rowsRead": <number of rows read>
},
...
}
},
"<your sink name2>": {
...
},
...
}
}
}
例如,要获取在名为“dataflowActivity”的活动中写入接收器“sink1”的行数,可使用 @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten
。
要获取从该接收器中使用的源“source1”读取的行数,可使用 @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead
。
注意
如果接收器写入的行数为零,则它将不会显示在指标中。 可以使用 contains
函数来验证行是否存在。 例如,contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1')
检查是否存在写入 sink1 的任意行。
相关内容
参阅支持的控制流活动: