Attività ForEach in Azure Data Factory e Azure Synapse Analytics
SI APPLICA A: Azure Data Factory Azure Synapse Analytics
Suggerimento
Provare Data Factory in Microsoft Fabric, una soluzione di analisi all-in-one per le aziende. Microsoft Fabric copre tutto, dallo spostamento dati al data science, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Vedere le informazioni su come iniziare una nuova prova gratuita!
L'attività ForEach definisce un flusso di controllo ripetuto in una pipeline di Azure Data Factory o Synapse. Questa attività viene usata per eseguire l'iterazione di una raccolta e attività specifiche in un ciclo. L'implementazione di cicli di questa attività è simile alla struttura di esecuzione in ciclo Foreach nei linguaggi di programmazione.
Creare un'attività ForEach con l'interfaccia utente
Per usare un'attività ForEach in una pipeline, seguire questa procedura:
È possibile usare qualsiasi variabile di tipo di matrice o output di altre attività come input per l'attività ForEach. Per creare una variabile di matrice, selezionare lo sfondo dell'area di disegno della pipeline e quindi selezionare la scheda Variabili per aggiungere una variabile di tipo matrice, come illustrato di seguito.
Cercare ForEach nel riquadro Attività pipeline e trascinare un'attività ForEach nell'area di disegno della pipeline.
Selezionare la nuova attività ForEach nell'area di disegno se non è già selezionata e la relativa scheda Impostazioni per modificarne i dettagli.
Selezionare il campo Elementi e quindi selezionare il collegamento Aggiungi contenuto dinamico per aprire il riquadro dell'editor di contenuto dinamico.
Selezionare la matrice di input da filtrare nell'editor di contenuto dinamico. In questo esempio viene selezionata la variabile creata nel primo passaggio.
Selezionare l'editor Attività nell'attività ForEach per aggiungere una o più attività da eseguire per ogni elemento nella matrice di input Elementi.
In tutte le attività create all'interno dell'attività ForEach è possibile fare riferimento all'elemento corrente di cui l'attività ForEach esegue l'iterazione dall'elenco Elementi. È possibile fare riferimento all'elemento corrente ovunque sia possibile usare un'espressione dinamica per specificare un valore della proprietà. Nell'editor di contenuto dinamico selezionare l'iteratore ForEach per restituire l'elemento corrente.
Sintassi
Le proprietà sono descritte più avanti in questo articolo. La proprietà items rappresenta la raccolta e ogni elemento della raccolta viene definito tramite @item()
, come illustrato nella sintassi seguente:
{
"name":"MyForEachActivityName",
"type":"ForEach",
"typeProperties":{
"isSequential":"true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
"type": "Expression"
},
"activities":[
{
"name":"MyCopyActivity",
"type":"Copy",
"typeProperties":{
...
},
"inputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@item()"
}
}
]
}
]
}
}
Proprietà del tipo
Proprietà | Descrizione | Valori consentiti | Obbligatoria |
---|---|---|---|
name | Nome dell'attività ForEach. | String | Sì |
type | Deve essere impostato su ForEach | String | Sì |
isSequential | Specifica se il ciclo deve essere eseguito in sequenza o in parallelo. È possibile eseguire al massimo 50 iterazioni del ciclo simultanee in parallelo. Se si dispone ad esempio di un'iterazione di attività ForEach su un'attività di copia con 10 set di dati di origine e sink diversi con isSequential impostato su False, tutte le copie vengono eseguite simultaneamente. L'impostazione predefinita è False. Se "isSequential" è impostato su False, assicurarsi che sia presente una configurazione corretta per usare più eseguibili. In caso contrario, questa proprietà deve essere usata con attenzione per evitare di incorrere in conflitti di scrittura. Per altre informazioni, vedere la sezione Esecuzione parallela. |
Booleano | No. L'impostazione predefinita è False. |
batchCount | Numero di batch da usare per controllare il numero di esecuzioni parallele, quando isSequential è impostato su False. Questo è il limite di concorrenza superiore, ma l'attività for-each non viene sempre eseguita in corrispondenza di questo numero | Valore intero (massimo 50) | No. Il valore predefinito è 20. |
Articoli | Un'espressione che restituisce una matrice JSON su cui eseguire un'iterazione. | Espressione (che restituisce una matrice JSON) | Sì |
Attività | Le attività da eseguire. | Elenco di attività | Sì |
Esecuzione parallela
Se isSequential è impostato su false, l'attività esegue le iterazioni in parallelo con un massimo di 50 iterazioni simultanee. Questa impostazione deve essere usata con cautela. Se le iterazioni simultanee scrivono nella stessa cartella, ma in file diversi, non ci sono problemi. Se le iterazioni simultanee scrivono contemporaneamente in esattamente lo stesso file, questo approccio causa un errore.
Linguaggio delle espressioni di iterazione
Nell'attività ForEach, fornire una matrice di cui eseguire un'iterazione per la proprietà items." Usare @item()
per eseguire l'iterazione su un'unica enumerazione nell'attività ForEach. Ad esempio, se items è una matrice: [1, 2, 3], @item()
restituisce 1 nella prima iterazione, 2 nella seconda iterazione e 3 nella terza iterazione. È anche possibile usare @range(0,10)
come espressione per eseguire l'iterazione dieci volte a partire da 0 e terminando con 9.
Iterazione su una singola attività
Scenario: copia dello stesso file di origine del BLOB di Azure in più file di destinazione nel BLOB di Azure.
Definizione della pipeline
{
"name": "<MyForEachPipeline>",
"properties": {
"activities": [
{
"name": "<MyForEachActivity>",
"type": "ForEach",
"typeProperties": {
"isSequential": "true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPath",
"type": "Expression"
},
"activities": [
{
"name": "MyCopyActivity",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": "false"
},
"sink": {
"type": "BlobSink",
"copyBehavior": "PreserveHierarchy"
}
},
"inputs": [
{
"referenceName": "<MyDataset>",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs": [
{
"referenceName": "MyDataset",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@item()"
}
}
]
}
]
}
}
],
"parameters": {
"mySourceDatasetFolderPath": {
"type": "String"
},
"mySinkDatasetFolderPath": {
"type": "String"
}
}
}
}
Definizione del set di dati BLOB
{
"name":"<MyDataset>",
"properties":{
"type":"AzureBlob",
"typeProperties":{
"folderPath":{
"value":"@dataset().MyFolderPath",
"type":"Expression"
}
},
"linkedServiceName":{
"referenceName":"StorageLinkedService",
"type":"LinkedServiceReference"
},
"parameters":{
"MyFolderPath":{
"type":"String"
}
}
}
}
Valori del parametro di esecuzione
{
"mySourceDatasetFolderPath": "input/",
"mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}
Eseguire l'iterazione su più attività
È possibile eseguire l'iterazione su più attività (ad esempio attività di copia e web) in un'attività ForEach. In questo scenario, si consiglia di astrarre più attività in una pipeline distinta. È quindi possibile usare l'attività ExecutePipeline nella pipeline con l'attività ForEach per richiamare la pipeline distinta con più attività.
Sintassi
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "<MyForEachMultipleActivities>"
"typeProperties": {
"isSequential": true,
"items": {
...
},
"activities": [
{
"type": "ExecutePipeline",
"name": "<MyInnerPipeline>"
"typeProperties": {
"pipeline": {
"referenceName": "<copyHttpPipeline>",
"type": "PipelineReference"
},
"parameters": {
...
},
"waitOnCompletion": true
}
}
]
}
}
],
"parameters": {
...
}
}
}
Esempio
Scenario: iterazione su un oggetto InnerPipeline in un'attività ForEach con l'attività di ExecutePipeline. La pipeline interna viene copiata con le definizioni dello schema parametrizzate.
Definizione della pipeline master
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "MyForEachActivity",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.inputtables",
"type": "Expression"
},
"activities": [
{
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "InnerCopyPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceTableName": {
"value": "@item().SourceTable",
"type": "Expression"
},
"sourceTableStructure": {
"value": "@item().SourceTableStructure",
"type": "Expression"
},
"sinkTableName": {
"value": "@item().DestTable",
"type": "Expression"
},
"sinkTableStructure": {
"value": "@item().DestTableStructure",
"type": "Expression"
}
},
"waitOnCompletion": true
},
"name": "ExecuteCopyPipeline"
}
]
}
}
],
"parameters": {
"inputtables": {
"type": "Array"
}
}
}
}
Definizione della pipeline interna
{
"name": "InnerCopyPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
}
},
"sink": {
"type": "SqlSink"
}
},
"name": "CopyActivity",
"inputs": [
{
"referenceName": "sqlSourceDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sourceTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sourceTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "sqlSinkDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sinkTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sinkTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
]
}
],
"parameters": {
"sourceTableName": {
"type": "String"
},
"sourceTableStructure": {
"type": "String"
},
"sinkTableName": {
"type": "String"
},
"sinkTableStructure": {
"type": "String"
}
}
}
}
Definizione del set di dati di origine
{
"name": "sqlSourceDataset",
"properties": {
"type": "SqlServerTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "sqlserverLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Definizione del set di dati sink
{
"name": "sqlSinkDataSet",
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "azureSqlLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Parametri della pipeline master
{
"inputtables": [
{
"SourceTable": "department",
"SourceTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
],
"DestTable": "department2",
"DestTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
]
}
]
}
Aggregazione di output
Per aggregare gli output dell'attività foreach, usare Variabili e l'attività Accoda variabile.
Innanzitutto, dichiarare array
come variabile nella pipeline. Quindi, richiamare l'attività Aggiungi variabile all'interno di ogni ciclo foreach. Successivamente, è possibile recuperare l'aggregazione dall'array.
Limitazioni e soluzioni alternative
Di seguito vengono descritte alcune limitazioni dell'attività ForEach con le soluzioni alternative suggerite.
Limitazione | Soluzione alternativa |
---|---|
Non è possibile annidare un ciclo ForEach all'interno di un altro ciclo ForEach (o un ciclo Until). | Progettare una pipeline a due livelli, in cui la pipeline esterna con il ciclo ForEach esterno esegue l'iterazione su una pipeline interna con il ciclo annidato. |
Per l'attività ForEach sono previsti un batchCount massimo di 50 per l'elaborazione parallela e un massimo di 100.000 elementi. |
Progettare una pipeline a due livelli in cui la pipeline esterna con l'attività ForEach esegue l'iterazione su una pipeline interna. |
SetVariable non può essere usato all'interno di un'attività ForEach eseguita in parallelo perché le variabili sono globali per l'intera pipeline, non usano ForEach o altre attività come ambito. | È consigliabile usare ForEach sequenziale o usare Execute Pipeline in ForEach (variabile/parametro gestito nella pipeline figlio). |
Contenuto correlato
Vedere altre attività del flusso di controllo supportate: