Действие ForEach в Фабрике данных Azure и Azure Synapse Analytics
ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics
Совет
Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !
Действие ForEach определяет повторяющийся поток управления в конвейере Фабрики данных Azure или Synapse. Это действие используется для выполнения итерации коллекции и выполняет указанные в цикле действия. Реализация цикла этого действия аналогична структуре цикла Foreach на языках программирования.
Создание действия ForEach с помощью пользовательского интерфейса
Чтобы использовать действие ForEach в конвейере, выполните следующие действия:
В качестве входных данных для действия ForEach можно использовать любую переменную типа массива или выходные данные из других действий. Чтобы создать переменную массива, выберите фон холста конвейера, а затем перейдите на вкладку Переменные, чтобы добавить переменную типа массива, как показано ниже.
Выполните поиск элемента ForEach на панели конвейера "Действия" и перетащите действие ForEach на холст конвейера.
Выберите новое действие ForEach на панели холста, если оно еще не выбрано, и перейдите на вкладку Параметры, чтобы изменить сведения о нем.
Выберите поле Элементы, а затем щелкните ссылку Добавить динамическое содержимое, чтобы открыть панель редактора динамического содержимого.
Выберите входной массив для фильтрации в редакторе динамического содержимого. В этом примере мы выбираем переменную, созданную в первом шаге.
Выберите редактор действий в действии ForEach, чтобы добавить одно или несколько действий, выполняемых для каждого элемента в массиве входных элементов.
В любых действиях, создаваемых в рамках действия ForEach, можно ссылаться на текущий элемент, который многократно используется действием ForEach, из списка Элементы. Вы можете ссылаться на текущий элемент в любом месте, где можно использовать динамическое выражение для указания значения свойства. В редакторе динамического содержимого выберите итератор ForEach для возврата текущего элемента.
Синтаксис
Свойства описаны далее в этой статье. Свойство элементов является коллекцией, а каждый элемент в коллекции определяется с помощью @item()
, как показано в следующем синтаксисе:
{
"name":"MyForEachActivityName",
"type":"ForEach",
"typeProperties":{
"isSequential":"true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
"type": "Expression"
},
"activities":[
{
"name":"MyCopyActivity",
"type":"Copy",
"typeProperties":{
...
},
"inputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@item()"
}
}
]
}
]
}
}
Свойства типа
Свойство | Description | Допустимые значения | Обязательное поле |
---|---|---|---|
name | Имя действия ForEach. | Строка | Да |
type | Должно быть задано значение ForEach. | Строка | Да |
isSequential | Определяет, как следует выполнять цикл: последовательно или параллельно. В параллельном режиме может выполняться одновременно не более 50 итераций цикла. Например, если действие ForEach выполняет итерацию действия копирования с 10 разными наборами данных источника и приемника и isSequential имеет значение false, все копии будут выполняться одновременно. Значение по умолчанию — False. Если isSequential имеет значение false, убедитесь в наличии правильной конфигурации для запуска нескольких исполняемых файлов. В противном случае это свойство следует использовать с осторожностью, чтобы исключить конфликты записи. Дополнительные сведения см. в разделе Параллельное выполнение. |
Логический | № Значение по умолчанию — False. |
batchCount | Число пакетов, используемое для управления числом параллельных выполнений (если параметр isSequential имеет значение false). Это максимальный предел параллелизма, однако для действий ForEach не всегда используется это число. | Целое число (максимум 50) | № Значение по умолчанию — 20. |
Товаров | Выражение, возвращающее массив JSON для итерации. | Выражение (возвращающее массив JSON) | Да |
Процедуры | Действия для выполнения. | Список действий | Да |
Параллельное выполнение
Если isSequential имеет значение false, итерация действия выполняется параллельно (с максимум 50 параллельными итерациями). Этот параметр следует использовать с осторожностью. Этот метод подходит, если параллельные итерации выполняют запись в ту же папку, но в разные файлы. Если параллельные итерации выполняют запись в тот же файл, такой подход обычно вызывает ошибку.
Язык выражений итерации
В действии ForEach можно указать массив для итерации по свойству Элементы. Используйте @item()
для итерации по одному перечислению в действии ForEach. Например, если items представляет собой массив: [1, 2, 3], @item()
возвращает 1 в первой итерации, 2 во второй итерации и 3 в третьей итерации. Можно также использовать выражение наподобие @range(0,10)
, чтобы выполнять десять итераций, начиная с 0 и заканчивая 9.
Итерация отдельного действия
Сценарий: копирование из того же исходного файла в большом двоичном объекте Azure в несколько конечных файлов в большом двоичном объекте Azure.
Определение конвейера
{
"name": "<MyForEachPipeline>",
"properties": {
"activities": [
{
"name": "<MyForEachActivity>",
"type": "ForEach",
"typeProperties": {
"isSequential": "true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPath",
"type": "Expression"
},
"activities": [
{
"name": "MyCopyActivity",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": "false"
},
"sink": {
"type": "BlobSink",
"copyBehavior": "PreserveHierarchy"
}
},
"inputs": [
{
"referenceName": "<MyDataset>",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs": [
{
"referenceName": "MyDataset",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@item()"
}
}
]
}
]
}
}
],
"parameters": {
"mySourceDatasetFolderPath": {
"type": "String"
},
"mySinkDatasetFolderPath": {
"type": "String"
}
}
}
}
Определение набора данных большого двоичного объекта
{
"name":"<MyDataset>",
"properties":{
"type":"AzureBlob",
"typeProperties":{
"folderPath":{
"value":"@dataset().MyFolderPath",
"type":"Expression"
}
},
"linkedServiceName":{
"referenceName":"StorageLinkedService",
"type":"LinkedServiceReference"
},
"parameters":{
"MyFolderPath":{
"type":"String"
}
}
}
}
Значения параметров запуска
{
"mySourceDatasetFolderPath": "input/",
"mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}
Итерация нескольких действий
Возможно выполнять итерацию нескольких действий (например, копирование и веб-действие) в действии ForEach. В этом сценарии рекомендуется абстрагировать несколько действий в отдельном конвейере. Затем с помощью действия ExecutePipeline в конвейере с действием ForEach вызовите отдельный конвейер с несколькими действиями.
Синтаксис
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "<MyForEachMultipleActivities>"
"typeProperties": {
"isSequential": true,
"items": {
...
},
"activities": [
{
"type": "ExecutePipeline",
"name": "<MyInnerPipeline>"
"typeProperties": {
"pipeline": {
"referenceName": "<copyHttpPipeline>",
"type": "PipelineReference"
},
"parameters": {
...
},
"waitOnCompletion": true
}
}
]
}
}
],
"parameters": {
...
}
}
}
Пример
Сценарий: итерация по внутреннему конвейеру в действии ForEach с действием выполнения конвейера. Внутренний конвейер копируется с параметризованными определениями схемы.
Определение главного конвейера
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "MyForEachActivity",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.inputtables",
"type": "Expression"
},
"activities": [
{
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "InnerCopyPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceTableName": {
"value": "@item().SourceTable",
"type": "Expression"
},
"sourceTableStructure": {
"value": "@item().SourceTableStructure",
"type": "Expression"
},
"sinkTableName": {
"value": "@item().DestTable",
"type": "Expression"
},
"sinkTableStructure": {
"value": "@item().DestTableStructure",
"type": "Expression"
}
},
"waitOnCompletion": true
},
"name": "ExecuteCopyPipeline"
}
]
}
}
],
"parameters": {
"inputtables": {
"type": "Array"
}
}
}
}
Определение внутреннего конвейера
{
"name": "InnerCopyPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
}
},
"sink": {
"type": "SqlSink"
}
},
"name": "CopyActivity",
"inputs": [
{
"referenceName": "sqlSourceDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sourceTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sourceTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "sqlSinkDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sinkTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sinkTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
]
}
],
"parameters": {
"sourceTableName": {
"type": "String"
},
"sourceTableStructure": {
"type": "String"
},
"sinkTableName": {
"type": "String"
},
"sinkTableStructure": {
"type": "String"
}
}
}
}
Определение исходного набора данных
{
"name": "sqlSourceDataset",
"properties": {
"type": "SqlServerTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "sqlserverLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Определение набора данных приемника
{
"name": "sqlSinkDataSet",
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "azureSqlLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Параметры главного конвейера
{
"inputtables": [
{
"SourceTable": "department",
"SourceTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
],
"DestTable": "department2",
"DestTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
]
}
]
}
Агрегирование данных вывода
Чтобы агрегировать выходные данные для каждого действия foreach, используйте действия Variables (переменные) и Append Variable (добавить переменную).
Сначала объявите переменную array
в конвейере. После этого вызывайте действие Добавления переменной внутри каждого цикла foreach. Впоследствии вы можете получить агрегат из вашего массива.
Ограничения и методы обхода
Ниже описаны некоторые ограничения для действия ForEach и рекомендуемые обходные решения.
Ограничение | Обходное решение |
---|---|
Не вкладывайте цикл ForEach в другой цикл ForEach (или в цикл Until). | Разработайте двухуровневый конвейер, где внешний конвейер с внешним циклом ForEach выполняет итерацию по внутреннему конвейеру с вложенным циклом. |
Количество операций параллельной обработки batchCount для действия ForEach не может превышать 50, максимальное число элементов — 100 000. |
Разработайте двухуровневый конвейер, где внешний конвейер с действием ForEach выполняет итерацию по внутреннему конвейеру. |
SetVariable нельзя использовать внутри действия ForEach, которое выполняется параллельно, так как переменные являются глобальными для всего конвейера и не входят в область ForEach и другие действия. | Можно использовать ForEach последовательно или действие Execute Pipeline внутри ForEach (переменную или параметр, обрабатываемые в дочернем конвейере). |
Связанный контент
Ознакомьтесь с другими поддерживаемыми действиями потока управления: