Condividi tramite


Attività ForEach in Azure Data Factory e Azure Synapse Analytics

SI APPLICA A: Azure Data Factory Azure Synapse Analytics

Suggerimento

Provare Data Factory in Microsoft Fabric, una soluzione di analisi all-in-one per le aziende. Microsoft Fabric copre tutto, dallo spostamento dati al data science, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Vedere le informazioni su come iniziare una nuova prova gratuita!

L'attività ForEach definisce un flusso di controllo ripetuto in una pipeline di Azure Data Factory o Synapse. Questa attività viene usata per eseguire l'iterazione di una raccolta e attività specifiche in un ciclo. L'implementazione di cicli di questa attività è simile alla struttura di esecuzione in ciclo Foreach nei linguaggi di programmazione.

Creare un'attività ForEach con l'interfaccia utente

Per usare un'attività ForEach in una pipeline, seguire questa procedura:

  1. È possibile usare qualsiasi variabile di tipo di matrice o output di altre attività come input per l'attività ForEach. Per creare una variabile di matrice, selezionare lo sfondo dell'area di disegno della pipeline e quindi selezionare la scheda Variabili per aggiungere una variabile di tipo matrice, come illustrato di seguito.

    Mostra un'area di disegno della pipeline vuota con una variabile di tipo matrice aggiunta alla pipeline.

  2. Cercare ForEach nel riquadro Attività pipeline e trascinare un'attività ForEach nell'area di disegno della pipeline.

  3. Selezionare la nuova attività ForEach nell'area di disegno se non è già selezionata e la relativa scheda Impostazioni per modificarne i dettagli.

    Mostra l'interfaccia utente per un'attività Filtro.

  4. Selezionare il campo Elementi e quindi selezionare il collegamento Aggiungi contenuto dinamico per aprire il riquadro dell'editor di contenuto dinamico.

    Mostra il collegamento  Aggiungi contenuto dinamico  per la proprietà Elementi.

  5. Selezionare la matrice di input da filtrare nell'editor di contenuto dinamico. In questo esempio viene selezionata la variabile creata nel primo passaggio.

    Mostra l'editor di contenuto dinamico con selezionata la variabile creata nel primo passaggio

  6. Selezionare l'editor Attività nell'attività ForEach per aggiungere una o più attività da eseguire per ogni elemento nella matrice di input Elementi.

    Mostra il pulsante dell'editor Attività nell'attività ForEach nella finestra dell'editor della pipeline.

  7. In tutte le attività create all'interno dell'attività ForEach è possibile fare riferimento all'elemento corrente di cui l'attività ForEach esegue l'iterazione dall'elenco Elementi. È possibile fare riferimento all'elemento corrente ovunque sia possibile usare un'espressione dinamica per specificare un valore della proprietà. Nell'editor di contenuto dinamico selezionare l'iteratore ForEach per restituire l'elemento corrente.

    Mostra l'editor di contenuto dinamico con l'iteratore ForEach selezionato.

Sintassi

Le proprietà sono descritte più avanti in questo articolo. La proprietà items rappresenta la raccolta e ogni elemento della raccolta viene definito tramite @item(), come illustrato nella sintassi seguente:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Proprietà del tipo

Proprietà Descrizione Valori consentiti Obbligatoria
name Nome dell'attività ForEach. String
type Deve essere impostato su ForEach String
isSequential Specifica se il ciclo deve essere eseguito in sequenza o in parallelo. È possibile eseguire al massimo 50 iterazioni del ciclo simultanee in parallelo. Se si dispone ad esempio di un'iterazione di attività ForEach su un'attività di copia con 10 set di dati di origine e sink diversi con isSequential impostato su False, tutte le copie vengono eseguite simultaneamente. L'impostazione predefinita è False.

Se "isSequential" è impostato su False, assicurarsi che sia presente una configurazione corretta per usare più eseguibili. In caso contrario, questa proprietà deve essere usata con attenzione per evitare di incorrere in conflitti di scrittura. Per altre informazioni, vedere la sezione Esecuzione parallela.
Booleano No. L'impostazione predefinita è False.
batchCount Numero di batch da usare per controllare il numero di esecuzioni parallele, quando isSequential è impostato su False. Questo è il limite di concorrenza superiore, ma l'attività for-each non viene sempre eseguita in corrispondenza di questo numero Valore intero (massimo 50) No. Il valore predefinito è 20.
Articoli Un'espressione che restituisce una matrice JSON su cui eseguire un'iterazione. Espressione (che restituisce una matrice JSON)
Attività Le attività da eseguire. Elenco di attività

Esecuzione parallela

Se isSequential è impostato su false, l'attività esegue le iterazioni in parallelo con un massimo di 50 iterazioni simultanee. Questa impostazione deve essere usata con cautela. Se le iterazioni simultanee scrivono nella stessa cartella, ma in file diversi, non ci sono problemi. Se le iterazioni simultanee scrivono contemporaneamente in esattamente lo stesso file, questo approccio causa un errore.

Linguaggio delle espressioni di iterazione

Nell'attività ForEach, fornire una matrice di cui eseguire un'iterazione per la proprietà items." Usare @item() per eseguire l'iterazione su un'unica enumerazione nell'attività ForEach. Ad esempio, se items è una matrice: [1, 2, 3], @item() restituisce 1 nella prima iterazione, 2 nella seconda iterazione e 3 nella terza iterazione. È anche possibile usare @range(0,10) come espressione per eseguire l'iterazione dieci volte a partire da 0 e terminando con 9.

Iterazione su una singola attività

Scenario: copia dello stesso file di origine del BLOB di Azure in più file di destinazione nel BLOB di Azure.

Definizione della pipeline

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Definizione del set di dati BLOB

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Valori del parametro di esecuzione

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Eseguire l'iterazione su più attività

È possibile eseguire l'iterazione su più attività (ad esempio attività di copia e web) in un'attività ForEach. In questo scenario, si consiglia di astrarre più attività in una pipeline distinta. È quindi possibile usare l'attività ExecutePipeline nella pipeline con l'attività ForEach per richiamare la pipeline distinta con più attività.

Sintassi

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Esempio

Scenario: iterazione su un oggetto InnerPipeline in un'attività ForEach con l'attività di ExecutePipeline. La pipeline interna viene copiata con le definizioni dello schema parametrizzate.

Definizione della pipeline master

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Definizione della pipeline interna

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Definizione del set di dati di origine

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Definizione del set di dati sink

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Parametri della pipeline master

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Aggregazione di output

Per aggregare gli output dell'attività foreach, usare Variabili e l'attività Accoda variabile.

Innanzitutto, dichiarare arraycome variabile nella pipeline. Quindi, richiamare l'attività Aggiungi variabile all'interno di ogni ciclo foreach. Successivamente, è possibile recuperare l'aggregazione dall'array.

Limitazioni e soluzioni alternative

Di seguito vengono descritte alcune limitazioni dell'attività ForEach con le soluzioni alternative suggerite.

Limitazione Soluzione alternativa
Non è possibile annidare un ciclo ForEach all'interno di un altro ciclo ForEach (o un ciclo Until). Progettare una pipeline a due livelli, in cui la pipeline esterna con il ciclo ForEach esterno esegue l'iterazione su una pipeline interna con il ciclo annidato.
Per l'attività ForEach sono previsti un batchCount massimo di 50 per l'elaborazione parallela e un massimo di 100.000 elementi. Progettare una pipeline a due livelli in cui la pipeline esterna con l'attività ForEach esegue l'iterazione su una pipeline interna.
SetVariable non può essere usato all'interno di un'attività ForEach eseguita in parallelo perché le variabili sono globali per l'intera pipeline, non usano ForEach o altre attività come ambito. È consigliabile usare ForEach sequenziale o usare Execute Pipeline in ForEach (variabile/parametro gestito nella pipeline figlio).

Vedere altre attività del flusso di controllo supportate: