Atividade ForEach no Azure Data Factory e no Azure Synapse Analytics
APLICA-SE A: Azure Data Factory Azure Synapse Analytics
Dica
Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange desde movimentação de dados até ciência de dados, análise em tempo real, business intelligence e relatórios. Saiba como iniciar uma avaliação gratuita!
A atividade ForEach define um fluxo de controle de repetição em um pipeline do Azure Data Factory ou do Synapse. Essa atividade é usada para iterar em uma coleção e executa atividades especificadas em um loop. A implementação dessa atividade em loop é semelhante à estrutura em loop Foreach nas linguagens de programação.
Criar uma atividade ForEach com a interface do usuário
Para usar uma atividade ForEach em um pipeline, conclua as etapas a seguir:
Você pode usar qualquer variável de tipo de matriz ou as saídas de outras atividades como a entrada para a atividade ForEach. Para criar uma variável de matriz, selecione a tela de fundo da tela do pipeline e, em seguida, selecione a guia Variáveis para adicionar uma variável de tipo de matriz, conforme mostrado abaixo.
Procure ForEach no painel Atividades do pipeline e arraste uma atividade ForEach para a tela do pipeline.
Selecione a nova atividade ForEach na tela, se ainda não estiver selecionada, e a guia Configurações para editar os detalhes.
Selecione o campo Itens e, em seguida, o link Adicionar conteúdo dinâmico para abrir o painel do editor de conteúdo dinâmico.
Selecione a matriz de entrada a ser filtrada no editor de conteúdo dinâmico. Neste exemplo, selecionamos a variável criada na primeira etapa.
Selecione o editor de Atividades na atividade ForEach para adicionar uma ou mais atividades a serem executadas para cada item na matriz de Itens de entrada.
Em todas as atividades criadas na atividade ForEach, você pode referenciar o item atual que a atividade ForEach está iterando na lista de Itens. Você pode referenciar o item atual em qualquer lugar em que possa usar uma expressão dinâmica para especificar um valor de propriedade. No editor de conteúdo dinâmico, selecione o iterador ForEach para retornar o item atual.
Sintaxe
As propriedades são descritas posteriormente neste artigo. A propriedade dos itens é a coleção e cada item na coleção é referenciada usando o @item()
conforme mostrado na sintaxe a seguir:
{
"name":"MyForEachActivityName",
"type":"ForEach",
"typeProperties":{
"isSequential":"true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
"type": "Expression"
},
"activities":[
{
"name":"MyCopyActivity",
"type":"Copy",
"typeProperties":{
...
},
"inputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@item()"
}
}
]
}
]
}
}
Propriedades de tipo
Propriedade | Descrição | Valores permitidos | Obrigatório |
---|---|---|---|
name | Nome da atividade for-each. | String | Sim |
type | Deve ser definido como ForEach | String | Sim |
isSequential | Especifica se o loop deve ser executado em sequência ou em paralelo. O máximo de 50 iterações de loop pode ser executado ao mesmo tempo em paralelo. Por exemplo, se você tiver uma atividade ForEach iterando sobre uma atividade de cópia com 10 conjuntos de dados de origem e de coletor diferentes com isSequential definido como False, todas as cópias serão executadas ao mesmo tempo. O padrão é Falso. Se "isSequential" for definido como False, certifique-se de que há uma configuração correta para executar vários executáveis. Caso contrário, essa propriedade deverá ser usada com cuidado para evitar incorrer em conflitos de gravação. Para obter mais informações, consulte a seção Execução paralela. |
Boolean | Não. O padrão é Falso. |
batchCount | Contagem de lotes a ser usada para controlar o número de execuções paralelas (quando isSequential estiver definido como falso). Esse é o limite de simultaneidade superior, mas a atividade for-each nem sempre será executada nesse número | Inteiro (máximo de 50) | Não. O padrão é 20. |
Itens | Uma expressão que retorna uma matriz JSON a ser iterada. | Expressão (que retorna uma matriz JSON) | Sim |
Atividades | As atividades a serem executadas. | Lista de atividades | Sim |
Execução paralela
Se isSequential estiver definido como “false”, a atividade iterará em paralelo com um máximo de 50 iterações simultâneas. Essa configuração deve ser usada com cuidado. Se as iterações simultâneas estiverem gravando na mesma pasta, mas em diferentes arquivos, essa abordagem será boa. Se as iterações simultâneas estiverem gravando simultaneamente no mesmo arquivo, essa abordagem provavelmente causará um erro.
Linguagem de expressão de iteração
Na atividade ForEach, forneça uma matriz a ser iterada para os itens de propriedade. Use @item()
para iterar para uma única enumeração na atividade ForEach. Por exemplo, se items for uma matriz: [1, 2, 3], @item()
retornará 1 na primeira iteração, 2 na segunda iteração e 3 na terceira iteração. Você também pode usar @range(0,10)
como uma expressão para iterar dez vezes, começando com 0 e terminando com 9.
Iterar em uma única atividade
Cenário: cópia do mesmo arquivo de origem no Blob do Azure para vários arquivos de destino no Blob do Azure.
Definição de pipeline
{
"name": "<MyForEachPipeline>",
"properties": {
"activities": [
{
"name": "<MyForEachActivity>",
"type": "ForEach",
"typeProperties": {
"isSequential": "true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPath",
"type": "Expression"
},
"activities": [
{
"name": "MyCopyActivity",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": "false"
},
"sink": {
"type": "BlobSink",
"copyBehavior": "PreserveHierarchy"
}
},
"inputs": [
{
"referenceName": "<MyDataset>",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs": [
{
"referenceName": "MyDataset",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@item()"
}
}
]
}
]
}
}
],
"parameters": {
"mySourceDatasetFolderPath": {
"type": "String"
},
"mySinkDatasetFolderPath": {
"type": "String"
}
}
}
}
Definição de conjunto de dados de blob
{
"name":"<MyDataset>",
"properties":{
"type":"AzureBlob",
"typeProperties":{
"folderPath":{
"value":"@dataset().MyFolderPath",
"type":"Expression"
}
},
"linkedServiceName":{
"referenceName":"StorageLinkedService",
"type":"LinkedServiceReference"
},
"parameters":{
"MyFolderPath":{
"type":"String"
}
}
}
}
Valores de parâmetros de execução
{
"mySourceDatasetFolderPath": "input/",
"mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}
Iterar em várias atividades
É possível iterar várias atividades (por exemplo: as atividades de cópia e da Web) em uma atividade ForEach. Nesse cenário, recomendamos que você abstraia várias atividades em um pipeline separado. Em seguida, você pode usar a atividade ExecutePipeline no pipeline com a atividade ForEach para invocar o pipeline separado com várias atividades.
Sintaxe
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "<MyForEachMultipleActivities>"
"typeProperties": {
"isSequential": true,
"items": {
...
},
"activities": [
{
"type": "ExecutePipeline",
"name": "<MyInnerPipeline>"
"typeProperties": {
"pipeline": {
"referenceName": "<copyHttpPipeline>",
"type": "PipelineReference"
},
"parameters": {
...
},
"waitOnCompletion": true
}
}
]
}
}
],
"parameters": {
...
}
}
}
Exemplo
Cenário: iterar em um InnerPipeline dentro de uma atividade ForEach com a atividade de execução de pipeline. O pipeline interno copia com definições de esquema parametrizadas.
Definição de pipeline mestre
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "MyForEachActivity",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.inputtables",
"type": "Expression"
},
"activities": [
{
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "InnerCopyPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceTableName": {
"value": "@item().SourceTable",
"type": "Expression"
},
"sourceTableStructure": {
"value": "@item().SourceTableStructure",
"type": "Expression"
},
"sinkTableName": {
"value": "@item().DestTable",
"type": "Expression"
},
"sinkTableStructure": {
"value": "@item().DestTableStructure",
"type": "Expression"
}
},
"waitOnCompletion": true
},
"name": "ExecuteCopyPipeline"
}
]
}
}
],
"parameters": {
"inputtables": {
"type": "Array"
}
}
}
}
Definição de pipeline interno
{
"name": "InnerCopyPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
}
},
"sink": {
"type": "SqlSink"
}
},
"name": "CopyActivity",
"inputs": [
{
"referenceName": "sqlSourceDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sourceTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sourceTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "sqlSinkDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sinkTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sinkTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
]
}
],
"parameters": {
"sourceTableName": {
"type": "String"
},
"sourceTableStructure": {
"type": "String"
},
"sinkTableName": {
"type": "String"
},
"sinkTableStructure": {
"type": "String"
}
}
}
}
Definição de conjunto de dados de origem
{
"name": "sqlSourceDataset",
"properties": {
"type": "SqlServerTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "sqlserverLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Definição de conjunto de dados de coletor
{
"name": "sqlSinkDataSet",
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "azureSqlLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Parâmetros de pipeline mestre
{
"inputtables": [
{
"SourceTable": "department",
"SourceTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
],
"DestTable": "department2",
"DestTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
]
}
]
}
Agregar saídas
Para agregar saídas da atividade foreach, use a atividade Variáveis e Acrescentar Variável.
Primeiro, declare uma array
variável no pipeline. Em seguida, invoque a atividade Append Variable dentro de cada loop foreach. Posteriormente, você pode recuperar a agregação na sua matriz.
Limitações e soluções alternativas
Aqui estão algumas limitações da atividade ForEach e sugestões de soluções alternativas.
Limitações | Solução alternativa |
---|---|
Você não pode aninhar um loop ForEach dentro de outro loop ForEach (ou um loop Until). | Projete um pipeline de dois níveis em que o pipeline externo com o loop ForEach externo itera sobre um pipeline interno com o loop aninhado. |
A atividade ForEach tem um máximo batchCount de 50 para processamento paralelo e um máximo de 100.000 itens. |
Projete um pipeline de dois níveis onde o pipeline externo com a atividade ForEach itera sobre um pipeline interno. |
SetVariable não pode ser usado dentro de uma atividade ForEach que é executada em paralelo, pois as variáveis são globais para todo o pipeline, elas não têm escopo para uma atividade ForEach nem para qualquer outra. | Considere usar o ForEach sequencial ou usar Executar Pipeline dentro de ForEach (variável/parâmetro administrado no pipeline filho). |
Conteúdo relacionado
Veja outras atividades de fluxo de controle com suporte: