Atividade ForEach no Azure Data Factory e no Azure Synapse Analytics

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Dica

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange desde movimentação de dados até ciência de dados, análise em tempo real, business intelligence e relatórios. Saiba como iniciar uma avaliação gratuita!

A atividade ForEach define um fluxo de controle de repetição em um pipeline do Azure Data Factory ou do Synapse. Essa atividade é usada para iterar em uma coleção e executa atividades especificadas em um loop. A implementação dessa atividade em loop é semelhante à estrutura em loop Foreach nas linguagens de programação.

Criar uma atividade ForEach com a interface do usuário

Para usar uma atividade ForEach em um pipeline, conclua as etapas a seguir:

  1. Você pode usar qualquer variável de tipo de matriz ou as saídas de outras atividades como a entrada para a atividade ForEach. Para criar uma variável de matriz, selecione a tela de fundo da tela do pipeline e, em seguida, selecione a guia Variáveis para adicionar uma variável de tipo de matriz, conforme mostrado abaixo.

    Shows an empty pipeline canvas with an array type variable added to the pipeline.

  2. Procure ForEach no painel Atividades do pipeline e arraste uma atividade ForEach para a tela do pipeline.

  3. Selecione a nova atividade ForEach na tela, se ainda não estiver selecionada, e a guia Configurações para editar os detalhes.

    Shows the UI for a Filter activity.

  4. Selecione o campo Itens e, em seguida, o link Adicionar conteúdo dinâmico para abrir o painel do editor de conteúdo dinâmico.

    Shows the  Add dynamic content  link for the Items property.

  5. Selecione a matriz de entrada a ser filtrada no editor de conteúdo dinâmico. Neste exemplo, selecionamos a variável criada na primeira etapa.

    Shows the dynamic content editor with the variable created in the first step selected

  6. Selecione o editor de Atividades na atividade ForEach para adicionar uma ou mais atividades a serem executadas para cada item na matriz de Itens de entrada.

    Shows the Activities editor button on the ForEach activity in the pipeline editor window.

  7. Em todas as atividades criadas na atividade ForEach, você pode referenciar o item atual que a atividade ForEach está iterando na lista de Itens. Você pode referenciar o item atual em qualquer lugar em que possa usar uma expressão dinâmica para especificar um valor de propriedade. No editor de conteúdo dinâmico, selecione o iterador ForEach para retornar o item atual.

    Shows the dynamic content editor with the ForEach iterator selected.

Sintaxe

As propriedades são descritas posteriormente neste artigo. A propriedade dos itens é a coleção e cada item na coleção é referenciada usando o @item() conforme mostrado na sintaxe a seguir:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Propriedades de tipo

Propriedade Descrição Valores permitidos Obrigatório
name Nome da atividade for-each. String Sim
type Deve ser definido como ForEach String Sim
isSequential Especifica se o loop deve ser executado em sequência ou em paralelo. O máximo de 50 iterações de loop pode ser executado ao mesmo tempo em paralelo. Por exemplo, se você tiver uma atividade ForEach iterando sobre uma atividade de cópia com 10 conjuntos de dados de origem e de coletor diferentes com isSequential definido como False, todas as cópias serão executadas ao mesmo tempo. O padrão é Falso.

Se "isSequential" for definido como False, certifique-se de que há uma configuração correta para executar vários executáveis. Caso contrário, essa propriedade deverá ser usada com cuidado para evitar incorrer em conflitos de gravação. Para obter mais informações, consulte a seção Execução paralela.
Boolean Não. O padrão é Falso.
batchCount Contagem de lotes a ser usada para controlar o número de execução paralela (quando isSequential estiver definido como false). Esse é o limite de simultaneidade superior, mas a atividade for-each nem sempre será executada nesse número Inteiro (máximo de 50) Não. O padrão é 20.
Itens Uma expressão que retorna uma matriz JSON a ser iterada. Expressão (que retorna uma matriz JSON) Sim
Atividades As atividades a serem executadas. Lista de atividades Sim

Execução paralela

Se isSequential estiver definido como “false”, a atividade iterará em paralelo com um máximo de 50 iterações simultâneas. Essa configuração deve ser usada com cuidado. Se as iterações simultâneas estiverem gravando na mesma pasta, mas em diferentes arquivos, essa abordagem será boa. Se as iterações simultâneas estiverem gravando simultaneamente no mesmo arquivo, essa abordagem provavelmente causará um erro.

Linguagem de expressão de iteração

Na atividade ForEach, forneça uma matriz a ser iterada para os itens de propriedade. Use @item() para iterar para uma única enumeração na atividade ForEach. Por exemplo, se items for uma matriz: [1, 2, 3], @item() retornará 1 na primeira iteração, 2 na segunda iteração e 3 na terceira iteração. Você também pode usar @range(0,10) como uma expressão para iterar dez vezes, começando com 0 e terminando com 9.

Iterar em uma única atividade

Cenário: cópia do mesmo arquivo de origem no Blob do Azure para vários arquivos de destino no Blob do Azure.

Definição de pipeline

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Definição de conjunto de dados de blob

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Valores de parâmetros de execução

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Iterar em várias atividades

É possível iterar várias atividades (por exemplo: as atividades de cópia e da Web) em uma atividade ForEach. Nesse cenário, recomendamos que você abstraia várias atividades em um pipeline separado. Em seguida, você pode usar a atividade ExecutePipeline no pipeline com a atividade ForEach para invocar o pipeline separado com várias atividades.

Sintaxe

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Exemplo

Cenário: iterar em um InnerPipeline dentro de uma atividade ForEach com a atividade de execução de pipeline. O pipeline interno copia com definições de esquema parametrizadas.

Definição de pipeline mestre

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Definição de pipeline interno

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Definição de conjunto de dados de origem

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Definição de conjunto de dados de coletor

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Parâmetros de pipeline mestre

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Agregar saídas

Para agregar saídas da atividade foreach, use a atividade Variáveis e Acrescentar Variável.

Primeiro, declare uma arrayvariável no pipeline. Em seguida, invoque a atividade Append Variable dentro de cada loop foreach. Posteriormente, você pode recuperar a agregação na sua matriz.

Limitações e soluções alternativas

Aqui estão algumas limitações da atividade ForEach e sugestões de soluções alternativas.

Limitações Solução alternativa
Você não pode aninhar um loop ForEach dentro de outro loop ForEach (ou um loop Until). Projete um pipeline de dois níveis em que o pipeline externo com o loop ForEach externo itera sobre um pipeline interno com o loop aninhado.
A atividade ForEach tem um máximo batchCount de 50 para processamento paralelo e um máximo de 100.000 itens. Projete um pipeline de dois níveis onde o pipeline externo com a atividade ForEach itera sobre um pipeline interno.
SetVariable não pode ser usado dentro de uma atividade ForEach que é executada em paralelo, pois as variáveis são globais para todo o pipeline, elas não têm escopo para uma atividade ForEach nem para qualquer outra. Considere usar o ForEach sequencial ou usar Executar Pipeline dentro de ForEach (variável/parâmetro administrado no pipeline filho).

Veja outras atividades de fluxo de controle com suporte: