Atividade ForEach no Azure Data Factory e no Azure Synapse Analytics

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Gorjeta

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!

A Atividade ForEach define um fluxo de controle repetitivo em um pipeline do Azure Data Factory ou Sinapse. Esta atividade é utilizada para iterar uma coleção e executa atividades especificadas em ciclo. A implementação de ciclo desta atividade é semelhante à estrutura de ciclo Foreach nas linguagens de programação.

Criar uma atividade ForEach com a interface do usuário

Para usar uma atividade ForEach em um pipeline, conclua as seguintes etapas:

  1. Você pode usar qualquer variável de tipo de matriz ou saídas de outras atividades como entrada para sua atividade ForEach. Para criar uma variável de matriz, selecione o plano de fundo da tela de pipeline e, em seguida, selecione a guia Variáveis para adicionar uma variável de tipo de matriz, conforme mostrado abaixo.

    Shows an empty pipeline canvas with an array type variable added to the pipeline.

  2. Procure ForEach no painel Atividades do pipeline e arraste uma atividade ForEach para a tela do pipeline.

  3. Selecione a nova atividade ForEach na tela, se ainda não estiver selecionada, e sua guia Configurações , para editar seus detalhes.

    Shows the UI for a Filter activity.

  4. Selecione o campo Itens e, em seguida, selecione o link Adicionar conteúdo dinâmico para abrir o painel do editor de conteúdo dinâmico.

    Shows the  Add dynamic content  link for the Items property.

  5. Selecione sua matriz de entrada a ser filtrada no editor de conteúdo dinâmico. Neste exemplo, selecionamos a variável criada na primeira etapa.

    Shows the dynamic content editor with the variable created in the first step selected

  6. Selecione o editor de Atividades na atividade ForEach para adicionar uma ou mais atividades a serem executadas para cada item na matriz Itens de entrada.

    Shows the Activities editor button on the ForEach activity in the pipeline editor window.

  7. Em qualquer atividade que você criar dentro da atividade ForEach, você pode fazer referência ao item atual pelo qual a atividade ForEach está iterando na lista Itens . Você pode fazer referência ao item atual em qualquer lugar em que possa usar uma expressão dinâmica para especificar um valor de propriedade. No editor de conteúdo dinâmico, selecione o iterador ForEach para retornar o item atual.

    Shows the dynamic content editor with the ForEach iterator selected.

Sintaxe

As propriedades são descritas mais adiante neste artigo. A propriedade items é a coleção e cada item na coleção é referido usando o @item() conforme mostrado na sintaxe a seguir:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Propriedades do tipo

Property Description Valores permitidos Obrigatório
nome Nome da atividade para cada atividade. Cadeia (de carateres) Sim
tipo Deve ser definido como ForEach Cadeia (de carateres) Sim
isSequencial Especifica se o loop deve ser executado sequencialmente ou em paralelo. Máximo de 50 iterações de loop podem ser executadas de uma só vez em paralelo). Por exemplo, se você tiver uma atividade ForEach iterando sobre uma atividade de cópia com 10 conjuntos de dados de origem e coletor diferentes com isSequential definido como False, todas as cópias serão executadas de uma só vez. O padrão é False.

Se "isSequential" estiver definido como False, verifique se há uma configuração correta para executar vários executáveis. Caso contrário, essa propriedade deve ser usada com cuidado para evitar incorrer em conflitos de gravação. Para obter mais informações, consulte a seção Execução paralela.
Boolean Não O padrão é False.
batchCount Contagem de lotes a ser usada para controlar o número de execução paralela (quando isSequential é definido como false). Este é o limite superior de simultaneidade, mas a atividade para cada uma nem sempre será executada nesse número Inteiro (máximo 50) Não O padrão é 20.
Items Uma expressão que retorna uma matriz JSON a ser iterada. Expressão (que retorna uma matriz JSON) Sim
Atividades As atividades a serem executadas. Lista de Atividades Sim

Execução paralela

Se isSequential estiver definido como false, a atividade itera em paralelo com um máximo de 50 iterações simultâneas. Esta definição deve ser utilizada com precaução. Se as iterações simultâneas estiverem gravando na mesma pasta, mas em arquivos diferentes, essa abordagem é boa. Se as iterações simultâneas estiverem gravando simultaneamente no mesmo arquivo, essa abordagem provavelmente causará um erro.

Linguagem de expressão de iteração

Na atividade ForEach, forneça uma matriz a ser iterada para os itens de propriedade." Use @item() para iterar sobre uma única enumeração na atividade ForEach. Por exemplo, se items for uma matriz: [1, 2, 3], @item() retorna 1 na primeira iteração, 2 na segunda iteração e 3 na terceira iteração. Você também pode usar @range(0,10) a expressão like para iterar dez vezes começando com 0 terminando com 9.

Iteração em uma única atividade

Cenário: copie do mesmo arquivo de origem no Blob do Azure para vários arquivos de destino no Blob do Azure.

Definição de pipeline

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Definição do conjunto de dados de Blob

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Executar valores de parâmetro

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Iterar em várias atividades

É possível iterar em várias atividades (por exemplo: copiar e atividades da Web) em uma atividade ForEach. Nesse cenário, recomendamos que você abstraia várias atividades em um pipeline separado. Em seguida, você pode usar a atividade ExecutePipeline no pipeline com a atividade ForEach para invocar o pipeline separado com várias atividades.

Sintaxe

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Exemplo

Cenário: itere sobre um InnerPipeline dentro de uma atividade ForEach com a atividade Execute Pipeline. O pipeline interno copia com definições de esquema parametrizadas.

Definição de Master Pipeline

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Definição de pipeline interno

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Definição do conjunto de dados de origem

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Definição do conjunto de dados do coletor

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Parâmetros do pipeline mestre

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Agregando saídas

Para agregar saídas de cada atividade, utilize Variáveis e Anexe atividade de variável.

Primeiro, declare uma arrayvariável no pipeline. Em seguida, invoque a atividade Append Variable dentro de cada loop foreach . Posteriormente, você pode recuperar a agregação de sua matriz.

Limitações e soluções alternativas

Aqui estão algumas limitações da atividade ForEach e soluções alternativas sugeridas.

Limitação Solução
Não é possível aninhar um loop ForEach dentro de outro loop ForEach (ou um loop TUntil ). Projete um pipeline de dois níveis em que o pipeline externo com o loop ForEach externo itere sobre um pipeline interno com o loop aninhado.
A atividade ForEach tem um máximo de 50 para processamento paralelo e um máximo batchCount de 100.000 itens. Projete um pipeline de dois níveis em que o pipeline externo com a atividade ForEach itere sobre um pipeline interno.
SetVariable não pode ser usado dentro de uma atividade ForEach que é executada em paralelo, pois as variáveis são globais para todo o pipeline, elas não têm escopo para um ForEach ou qualquer outra atividade. Considere usar ForEach sequencial ou usar Execute Pipeline dentro de ForEach (Variable/Parameter handled in child Pipeline).

Veja outras atividades de fluxo de controle suportadas: