Actividad ForEach de Azure Data Factory y Azure Synapse Analytics

SE APLICA A: Azure Data Factory Azure Synapse Analytics

Sugerencia

Pruebe Data Factory en Microsoft Fabric, una solución de análisis todo en uno para empresas. Microsoft Fabric abarca todo, desde el movimiento de datos hasta la ciencia de datos, el análisis en tiempo real, la inteligencia empresarial y los informes. Obtenga información sobre cómo iniciar una nueva evaluación gratuita.

La actividad ForEach define un flujo de control recurrente en una canalización de Azure Data Factory o Synapse. Esta actividad se usa para iterar una colección y ejecuta las actividades especificadas en un bucle. La implementación del bucle de esta actividad es similar a la estructura de bucle ForEach de los lenguajes de programación.

Creación de una actividad ForEach con la interfaz de usuario

Para usar una actividad ForEach en una canalización, complete los pasos siguientes:

  1. Puede usar cualquier variable de tipo de matriz o salidas de otras actividades como entrada para la actividad ForEach. Para crear una variable de matriz, seleccione el fondo del lienzo de canalización y, luego, seleccione la pestaña Variables para agregar una variable de tipo de matriz como se muestra a continuación.

    Shows an empty pipeline canvas with an array type variable added to the pipeline.

  2. Busque ForEach en el panel Actividades de canalización y arrastre una actividad ForEach al lienzo de canalización.

  3. Seleccione la nueva actividad ForEach en el lienzo si aún no está seleccionada y su pestaña Configuración para editar sus detalles.

    Shows the UI for a Filter activity.

  4. Seleccione el campo Elementos y, después, el vínculo Incorporación de contenido dinámico para abrir el panel del editor de contenido dinámico.

    Shows the  Add dynamic content  link for the Items property.

  5. Seleccione la matriz de entrada que se va a filtrar en el editor de contenido dinámico. En este ejemplo, hemos seleccionado la variable creada en el primer paso.

    Shows the dynamic content editor with the variable created in the first step selected

  6. Seleccione el editor Actividades en la actividad ForEach para agregar una o varias actividades que se ejecutarán para cada elemento de la matriz Elementos de entrada.

    Shows the Activities editor button on the ForEach activity in the pipeline editor window.

  7. En cualquier actividad que cree en la actividad ForEach, puede hacer referencia al elemento actual mediante el que la actividad ForEach está iterando de la lista Elementos. Puede hacer referencia al elemento actual en cualquier lugar donde pueda usar una expresión dinámica para especificar un valor de propiedad. En el editor de contenido dinámico, seleccione el iterador ForEach para devolver el elemento actual.

    Shows the dynamic content editor with the ForEach iterator selected.

Sintaxis

Las propiedades se describen más adelante en este artículo. La propiedad items es la colección y cada elemento de la colección se reconoce por usar @item(), como se muestra en la sintaxis siguiente:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Propiedades de tipo

Propiedad Descripción Valores permitidos Obligatorio
name Nombre de la actividad for-each. String
type Se debe establecer en ForEach String
isSequential Especifica si el bucle se debe ejecutar en secuencia o en paralelo. Se pueden ejecutar un máximo de 50 iteraciones de bucle a la vez en paralelo. Por ejemplo, si tiene una actividad ForEach que itera una actividad de copia con 10 conjuntos de datos de origen y receptor distintos con isSequential establecido en False, todas las copias se ejecutan a la vez. El valor predeterminado es False.

Si "isSequential" está establecido en False, asegúrese de que haya una configuración correcta para ejecutar varios archivos ejecutables. De lo contrario, esta propiedad se debe usar con precaución para no incurrir en conflictos de escritura. Para más información, consulte la sección Ejecución en paralelo.
Boolean No. El valor predeterminado es False.
batchCount Número de lotes que se usará para controlar el número de la ejecución en paralelo (cuando isSequential está establecido en false). Este es el límite de simultaneidad superior, pero la actividad for-each no siempre se ejecutará en este número. Entero (50 como máximo) No. El valor predeterminado es 20.
Elementos Una expresión que devuelve una matriz JSON que se iterará. Expresión (que devuelve una matriz JSON)
Actividades Las actividades que se ejecutarán. Lista de actividades

Ejecución en paralelo

Si isSequential está establecido en "false", la actividad itera en paralelo con un máximo de 50 iteraciones simultáneas. Esta configuración se debe usar con precaución. Si las iteraciones simultáneas escriben en la misma carpeta pero en distintos archivos, este enfoque es correcto. Si las iteraciones simultáneas escriben al mismo tiempo exactamente en el mismo archivo, es más probable que este enfoque provoque un error.

Lenguaje de expresión de iteración

En la actividad ForEach, proporcione una matriz que se va a iterar para los elementos de propiedad. Use @item() para iterar una sola enumeración en la actividad ForEach. Por ejemplo, si la propiedad items es una matriz: [1, 2, 3], @item() devuelve 1 en la primera iteración, 2 en la segunda y 3 en la tercera. También puede usar @range(0,10) como expresión para iterar diez veces comenzando con 0 y terminando con 9.

Iteración de una sola actividad

Escenario: Copia del mismo archivo de origen en un blob de Azure a varios archivos de destino en un blob de Azure.

Definición de la canalización

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Definición del conjunto de datos de blob

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Valores de parámetro de ejecución

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Iteración de varias actividades

Es posible iterar varias actividades (por ejemplo: actividades de copia y web) en una actividad ForEach. En este escenario, se recomienda abstraer varias actividades en una canalización independiente. Luego puede usar la actividad ExecutePipeline en la canalización con la actividad ForEach para invocar la canalización independiente con varias actividades.

Sintaxis

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Ejemplo

Escenario: Iteración de una canalización interna dentro de una actividad ForEach con la actividad de ejecución de canalización. La canalización interna copia con definiciones de esquema parametrizadas.

Definición de la canalización principal

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Definición de la canalización interna

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Definición del conjunto de datos de origen

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Definición del conjunto de datos de receptor

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Parámetros de la canalización principal

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Agregación de salidas

Para agregar salidas de la actividad foreach, utilice Variables y la actividad Append Variable.

Primero, declare una variablearray en la canalización. A continuación, invoque la actividad Append Variable dentro de cada bucle foreach. Posteriormente, puede recuperar la agregación de la matriz.

Limitaciones y soluciones alternativas

A continuación se indican algunas de las limitaciones de la actividad ForEach y las soluciones alternativas sugeridas.

Limitación Solución alternativa
No se puede anidar un bucle ForEach dentro de otro bucle ForEach (o un bucle Until). Diseñar una canalización de dos niveles, donde la canalización externa con el bucle ForEach exterior recorre en iteración una canalización interna con el bucle anidado.
La actividad ForEach tiene un valor máximo de batchCount de 50 para procesamiento en paralelo y un máximo de 100 000 elementos. Diseñar una canalización de dos niveles, donde la canalización externa con la actividad ForEach recorre en iteración una canalización interna.
SetVariable no se puede utilizar dentro de una actividad ForEach que se ejecuta en paralelo, ya que las variables son globales para toda la canalización, no se limitan a una instrucción ForEach o a cualquier otra actividad. Considere la posibilidad de usar una instrucción ForEach secuencial o ejecutar la canalización dentro de ForEach (variable/parámetro administrados en la canalización secundaria).

Vea otras actividades de flujo de control admitidas: