次の方法で共有


Azure Data Factory および Azure Synapse Analytics での ForEach アクティビティ

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。

ForEach アクティビティでは、Azure Data Factory または Synapse Analytics パイプラインで繰り返される制御フローを定義します。 このアクティビティは、コレクションを反復処理するために使用され、指定されたアクティビティをループで実行します。 このアクティビティのループの実装は、プログラミング言語の Foreach ループ構造に似ています。

UI を使用して ForEach アクティビティを作成する

パイプライン内で ForEach アクティビティを使用するには、次の手順を実行します。

  1. ForEach アクティビティの入力として、任意の配列型変数または他のアクティビティからの出力を使用できます。 配列変数を作成するには、パイプライン キャンバスの背景を選択し、[変数] タブを選択して、次に示すように配列型の変数を追加します。

    パイプラインに配列型変数が追加された空のパイプライン キャンバスを表示しています。

  2. パイプラインの [アクティビティ] ペイン内で ForEach を検索し、ForEach アクティビティをパイプライン キャンバスにドラッグします。

  3. キャンバス上で新しい ForEach アクティビティ (まだ選択されていない場合)、その [設定] タブの順に選択して、その詳細を編集します。

    Filter アクティビティの UI を表示しています。

  4. [項目] フィールドを選び、[動的なコンテンツの追加] リンクを選んで、動的コンテンツ エディター ペインを開きます。

    Items プロパティの [動的なコンテンツの追加] リンクを表示しています。

  5. 動的コンテンツ エディターでフィルター処理する入力配列を選びます。 この例では、最初の手順で作成した変数を選びます。

    最初の手順で作成された変数が選択された動的コンテンツ エディターを表示しています

  6. ForEach アクティビティのアクティビティ エディターを選択して、入力項目配列の各項目に対して実行される 1 つ以上のアクティビティを追加します。

    パイプライン エディター ウィンドウの ForEach アクティビティの [アクティビティ エディター] ボタンを表示しています。

  7. ForEach アクティビティ内に作成するすべてのアクティビティでは、ForEach アクティビティによって反復処理される現在の項目を [項目] 一覧から参照できます。 動的な式を使用してプロパティ値を指定できるあらゆる場所で、現在の項目を参照できます。 動的コンテンツ エディターで、現在の項目を返す ForEach 反復子を選択します。

    ForEach 反復子が選択された動的コンテンツ エディターを表示しています。

構文

プロパティは、この記事の後の方で説明します。 items プロパティはコレクションであり、次の構文に示すように、コレクション内の各項目は @item() を使用して参照されます。

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

型のプロパティ

プロパティ 説明 使用できる値 必須
name ForEach アクティビティの名前。 String はい
type ForEach に設定する必要があります String はい
isSequential ループを順番に実行するか、または並行して実行するかを指定します。 一度に最大 50 のループ反復処理を並行して実行できます。 たとえば、isSequential が False に設定された状態で、10 個の異なるソースとシンク データセットがあるコピー アクティビティに対して ForEach アクティビティ反復処理を実行すると、一度にすべてのコピーが実行されます。 既定値は False です。

"isSequential" が False に設定されている場合は、複数の実行可能ファイルを実行するための正しい構成が存在することを確認してください。 そうでない場合は、書き込みの競合が発生しないようにするために、このプロパティを慎重に使用する必要があります。 詳細については、「Parallel execution (並列実行)」セクションを参照してください。
Boolean いいえ。 既定値は False です。
batchCount 並列実行の数を制御するために使用するバッチの数 (isSequential が false に設定されている場合)。 これはコンカレンシーの上限ですが、For-Each アクティビティは常にこの数値で実行されるわけではありません Integer (最大 50) いいえ。 既定値は 20 です。
アイテム 反復処理される JSON 配列を返す式。 式 (これは JSON 配列を返します) はい
Activities 実行されるアクティビティ。 アクティビティの一覧 はい

並列実行

isSequential が false に設定されている場合、このアクティビティは最大 50 の同時実行反復処理と並行して反復処理します。 この設定は、慎重に使用する必要があります。 同時実行反復処理が同じフォルダーではあっても、異なるファイルへの書き込みである場合、このアプローチは適切です。 同時実行反復処理がまったく同じファイルへの同時書き込みである場合、このアプローチはエラーの原因になる可能性があります。

反復処理の式言語

ForEach アクティビティでは、プロパティ items に対して反復処理する配列を指定します。@item() を使用して ForEach アクティビティの単一の列挙体を反復処理します。 たとえば、items が配列: [1, 2, 3] である場合、@item() は最初の反復処理で 1 を、2 番目の反復処理で 2 を、3 番目の反復処理で 3 を返します。 また、@range(0,10) like 式を使用して、0 から 9 までの10 回の反復処理を行うこともできます。

1 つのアクティビティを反復処理する

シナリオ: Azure BLOB 内の同じソース ファイルから Azure BLOB 内の複数の宛先ファイルにコピーします。

パイプラインの定義

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

BLOB データセットの定義

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

実行パラメーターの値

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

複数のアクティビティを反復処理する

ForEach アクティビティで複数のアクティビティ (例: コピー アクティビティと Web アクティビティ) を反復処理できます。 このシナリオでは、複数のアクティビティを個別のパイプラインに抽象化することをお勧めします。 次に、ForEach アクティビティを含むパイプラインの ExecutePipeline アクティビティを使用して、複数のアクティビティを含む個別のパイプラインを呼び出すことができます。

構文

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

シナリオ: ExecutePipeline アクティビティを使用して ForEach アクティビティ内で InnerPipeline を反復処理します。 内部パイプラインは、パラメーター化されたスキーマ定義でコピーします。

マスター パイプラインの定義

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

内部パイプラインの定義

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

ソース データセットの定義

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

シンク データセットの定義

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

マスター パイプラインのパラメーター

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

出力の集計

foreach アクティビティの出力を集計するには、VariablesAppend Variable アクティビティを使用してください。

最初に、パイプライン内で array "変数" を宣言します。 次に、各 foreach ループ内で Append Variable アクティビティを呼び出します。 その後、配列から集計を取得できます。

制限事項と回避策

ForEach アクティビティと提案される回避策のいくつかの制限を次に示します。

制限事項 回避策
別の ForEach ループ (または Until ループ) 内に ForEach ループを入れ子にすることはできません。 入れ子にされたループが含まれる内側パイプライン上で外側の ForEach ループが含まれる外側パイプラインが反復される 2 段のパイプラインを設計します。
ForEach アクティビティには並列処理のために最大 50 の batchCount と、最大 100,000 の項目が含まれています。 内側パイプライン上で ForEach アクティビティが含まれる外側パイプラインが反復される 2 段のパイプラインを設計します。
SetVariable は、並列で実行される ForEach アクティビティ内では使用できません。この変数は、パイプライン全体に対してグローバルであるため、ForEach やその他のアクティビティは対象とされません。 順次 ForEach を使用するか、ForEach (子パイプラインで処理される変数/パラメーター) 内で Execute パイプラインを使用することを検討してください。

サポートされている他の制御フロー アクティビティを参照してください。