Compartir vía


Actividad ForEach de Azure Data Factory y Azure Synapse Analytics

SE APLICA A: Azure Data Factory Azure Synapse Analytics

Sugerencia

Pruebe Data Factory en Microsoft Fabric, una solución de análisis todo en uno para empresas. Microsoft Fabric abarca todo, desde el movimiento de datos hasta la ciencia de datos, el análisis en tiempo real, la inteligencia empresarial y los informes. Obtenga información sobre cómo iniciar una nueva evaluación gratuita.

La actividad ForEach define un flujo de control recurrente en una canalización de Azure Data Factory o Synapse. Esta actividad se usa para iterar una colección y ejecuta las actividades especificadas en un bucle. La implementación del bucle de esta actividad es similar a la estructura de bucle ForEach de los lenguajes de programación.

Creación de una actividad ForEach con la interfaz de usuario

Para usar una actividad ForEach en una canalización, complete los pasos siguientes:

  1. Puede usar cualquier variable de tipo de matriz o salidas de otras actividades como entrada para la actividad ForEach. Para crear una variable de matriz, seleccione el fondo del lienzo de canalización y, luego, seleccione la pestaña Variables para agregar una variable de tipo de matriz como se muestra a continuación.

    Muestra un lienzo de canalización vacío con una variable de tipo de matriz agregada a la canalización.

  2. Busque ForEach en el panel Actividades de canalización y arrastre una actividad ForEach al lienzo de canalización.

  3. Seleccione la nueva actividad ForEach en el lienzo si aún no está seleccionada y su pestaña Configuración para editar sus detalles.

    Muestra la UI de la actividad de filtro.

  4. Seleccione el campo Elementos y, después, el vínculo Incorporación de contenido dinámico para abrir el panel del editor de contenido dinámico.

    Muestra el   Agregar contenido dinámico  vínculo para la propiedad Items.

  5. Seleccione la matriz de entrada que se va a filtrar en el editor de contenido dinámico. En este ejemplo, hemos seleccionado la variable creada en el primer paso.

    Muestra el editor de contenido dinámico con la variable creada en el primer paso seleccionada.

  6. Seleccione el editor Actividades en la actividad ForEach para agregar una o varias actividades que se ejecutarán para cada elemento de la matriz Elementos de entrada.

    Muestra el botón del editor Actividades en la actividad ForEach de la ventana del editor de la canalización.

  7. En cualquier actividad que cree en la actividad ForEach, puede hacer referencia al elemento actual mediante el que la actividad ForEach está iterando de la lista Elementos. Puede hacer referencia al elemento actual en cualquier lugar donde pueda usar una expresión dinámica para especificar un valor de propiedad. En el editor de contenido dinámico, seleccione el iterador ForEach para devolver el elemento actual.

    Muestra el editor de contenido dinámico con el iterador ForEach seleccionado.

Sintaxis

Las propiedades se describen más adelante en este artículo. La propiedad items es la colección y cada elemento de la colección se reconoce por usar @item(), como se muestra en la sintaxis siguiente:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Propiedades de tipo

Propiedad Descripción Valores permitidos Obligatorio
name Nombre de la actividad for-each. String
type Se debe establecer en ForEach String
isSequential Especifica si el bucle se debe ejecutar en secuencia o en paralelo. Se pueden ejecutar un máximo de 50 iteraciones de bucle a la vez en paralelo. Por ejemplo, si tiene una actividad ForEach que itera una actividad de copia con 10 conjuntos de datos de origen y receptor distintos con isSequential establecido en False, todas las copias se ejecutan a la vez. El valor predeterminado es False.

Si "isSequential" está establecido en False, asegúrese de que haya una configuración correcta para ejecutar varios archivos ejecutables. De lo contrario, esta propiedad se debe usar con precaución para no incurrir en conflictos de escritura. Para más información, consulte la sección Ejecución en paralelo.
Boolean No. El valor predeterminado es False.
batchCount Recuento de lotes que se va a usar para controlar el número de ejecuciones paralelas (cuando isSequential se establece en falso). Este es el límite de simultaneidad superior, pero la actividad for-each no siempre se ejecutará en este número. Entero (50 como máximo) No. El valor predeterminado es 20.
Elementos Una expresión que devuelve una matriz JSON que se iterará. Expresión (que devuelve una matriz JSON)
Actividades Las actividades que se ejecutarán. Lista de actividades

Ejecución en paralelo

Si isSequential está establecido en "false", la actividad itera en paralelo con un máximo de 50 iteraciones simultáneas. Esta configuración se debe usar con precaución. Si las iteraciones simultáneas escriben en la misma carpeta pero en distintos archivos, este enfoque es correcto. Si las iteraciones simultáneas escriben al mismo tiempo exactamente en el mismo archivo, es más probable que este enfoque provoque un error.

Lenguaje de expresión de iteración

En la actividad ForEach, proporcione una matriz que se va a iterar para los elementos de propiedad. Use @item() para iterar una sola enumeración en la actividad ForEach. Por ejemplo, si la propiedad items es una matriz: [1, 2, 3], @item() devuelve 1 en la primera iteración, 2 en la segunda y 3 en la tercera. También puede usar @range(0,10) como expresión para iterar diez veces comenzando con 0 y terminando con 9.

Iteración de una sola actividad

Escenario: Copia del mismo archivo de origen en un blob de Azure a varios archivos de destino en un blob de Azure.

Definición de la canalización

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Definición del conjunto de datos de blob

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Valores de parámetro de ejecución

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Iteración de varias actividades

Es posible iterar varias actividades (por ejemplo: actividades de copia y web) en una actividad ForEach. En este escenario, se recomienda abstraer varias actividades en una canalización independiente. Luego puede usar la actividad ExecutePipeline en la canalización con la actividad ForEach para invocar la canalización independiente con varias actividades.

Sintaxis

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Ejemplo

Escenario: Iteración de una canalización interna dentro de una actividad ForEach con la actividad de ejecución de canalización. La canalización interna copia con definiciones de esquema parametrizadas.

Definición de la canalización principal

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Definición de la canalización interna

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Definición del conjunto de datos de origen

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Definición del conjunto de datos de receptor

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Parámetros de la canalización principal

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Agregación de salidas

Para agregar salidas de la actividad foreach, utilice Variables y la actividad Append Variable.

Primero, declare una variablearray en la canalización. A continuación, invoque la actividad Append Variable dentro de cada bucle foreach. Posteriormente, puede recuperar la agregación de la matriz.

Limitaciones y soluciones alternativas

A continuación se indican algunas de las limitaciones de la actividad ForEach y las soluciones alternativas sugeridas.

Limitación Solución alternativa
No se puede anidar un bucle ForEach dentro de otro bucle ForEach (o un bucle Until). Diseñar una canalización de dos niveles, donde la canalización externa con el bucle ForEach exterior recorre en iteración una canalización interna con el bucle anidado.
La actividad ForEach tiene un valor máximo de batchCount de 50 para procesamiento en paralelo y un máximo de 100 000 elementos. Diseñar una canalización de dos niveles, donde la canalización externa con la actividad ForEach recorre en iteración una canalización interna.
SetVariable no se puede utilizar dentro de una actividad ForEach que se ejecuta en paralelo, ya que las variables son globales para toda la canalización, no se limitan a una instrucción ForEach o a cualquier otra actividad. Considere la posibilidad de usar una instrucción ForEach secuencial o ejecutar la canalización dentro de ForEach (variable/parámetro administrados en la canalización secundaria).

Vea otras actividades de flujo de control admitidas: