Activité ForEach dans Azure Data Factory et Azure Synapse Analytics

S’APPLIQUE À : Azure Data Factory Azure Synapse Analytics

Conseil

Essayez Data Factory dans Microsoft Fabric, une solution d’analyse tout-en-un pour les entreprises. Microsoft Fabric couvre tous les aspects, du déplacement des données à la science des données, en passant par l’analyse en temps réel, l’aide à la décision et la création de rapports. Découvrez comment démarrer un nouvel essai gratuitement !

L’activité ForEach définit un flux de contrôle répétitif dans un pipeline Azure Data Factory ou Synapse. Elle permet d’effectuer une itération sur une collection, et exécute des activités spécifiées dans une boucle. L’implémentation en boucle de cette activité est semblable à la structure d’exécution en boucle de Foreach dans les langages de programmation.

Créer une activité ForEach avec l’interface utilisateur

Pour utiliser une activité ForEach dans un pipeline, effectuez les étapes suivantes :

  1. Vous pouvez utiliser n’importe quelle variable de type tableau ou les sorties d’autres activités comme entrée pour votre activité ForEach. Pour créer une variable de tableau, sélectionnez l’arrière-plan du canevas de pipeline, puis sélectionnez l’onglet Variables pour ajouter une variable de type tableau, comme illustré ci-dessous.

    Shows an empty pipeline canvas with an array type variable added to the pipeline.

  2. Recherchez ForEach dans le volet Activités de pipeline, puis faites glisser une activité ForEach vers le canevas du pipeline.

  3. Sélectionnez la nouvelle activité ForEach sur le canevas si elle ne l’est pas déjà, et son onglet Paramètres pour en modifier les détails.

    Shows the UI for a Filter activity.

  4. Sélectionnez le champ Éléments, puis sélectionnez le lien Ajouter du contenu dynamique pour ouvrir le volet de l’éditeur de contenu dynamique.

    Shows the  Add dynamic content  link for the Items property.

  5. Sélectionnez le tableau d’entrée à filtrer dans l’éditeur de contenu dynamique. Dans cet exemple, nous sélectionnons la variable créée à la première étape.

    Shows the dynamic content editor with the variable created in the first step selected

  6. Sélectionnez l’éditeur d’activités dans l’activité ForEach pour ajouter une ou plusieurs activités à exécuter pour chaque élément du tableau Éléments d’entrée.

    Shows the Activities editor button on the ForEach activity in the pipeline editor window.

  7. Dans toutes les activités que vous créez au sein de l’activité ForEach, vous pouvez référencer l’élément actuel sur lequel l’activité ForEach est en train d’itérer à partir de la liste Éléments. Vous pouvez référencer l’élément actuel partout où vous pouvez utiliser une expression dynamique pour spécifier une valeur de propriété. Dans l’éditeur de contenu dynamique, sélectionnez l’itérateur ForEach pour renvoyer l’élément actuel.

    Shows the dynamic content editor with the ForEach iterator selected.

Syntaxe

Les propriétés sont décrites plus loin dans cet article. La propriété items est la collection, et chaque élément dans celle-ci est référencé à l’aide de @item(), comme illustré dans la syntaxe suivante :

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Propriétés type

Propriété Description Valeurs autorisées Obligatoire
name Nom de l’activité ForEach. String Oui
type Doit être défini sur ForEach String Oui
isSequential Spécifie si la boucle doit être exécutée de façon séquentielle ou parallèle. Le nombre maximal d’itérations de boucle exécutables simultanément en parallèle est de 50. Par exemple, si vous avez une activité ForEach effectuant une itération sur une activité de copie portant sur 10 jeux de données de source et de récepteur différents avec la valeur isSequential définie sur False, toutes les copies sont exécutées en même temps. La valeur par défaut est FALSE.

Si la valeur de « isSequential » est définie sur False, assurez-vous qu’il existe une configuration correcte pour exécuter plusieurs exécutables. Autrement, cette propriété doit être utilisée avec précaution pour éviter des conflits d’écriture. Pour plus d’informations, voir la section Exécution parallèle.
Boolean Non. La valeur par défaut est FALSE.
batchCount Nombre de lots à utiliser pour contrôler le nombre d’exécutions en parallèle (lorsque isSequential est défini sur false). Il s’agit de la limite supérieure de concurrence, mais l’activité ForEach n’atteindra pas toujours ce nombre lors de son exécution. Entier (maximum 50) Non. La valeur par défaut est 20.
Éléments Expression qui retourne un tableau JSON auquel appliquer l’itération. Expression (qui retourne un tableau JSON) Oui
Activités Activités à exécuter. Liste des activités Oui

Exécution parallèle

Si la valeur de isSequential est définie sur False, l’activité effectue l’itération en parallèle avec un maximum de 50 itérations simultanées. Ce paramètre doit être utilisé avec précaution. Si les itérations simultanées écrivent dans le même dossier, mais dans des fichiers différents, cette approche convient. Si les itérations simultanées écrivent en même temps dans le même fichier, cette approche entraînera très probablement une erreur.

Langage d’expression de l’itération

Dans l’activité ForEach, fournissez un tableau sur lequel effectuer l’itération pour la propriété items. Utilisez @item() pour itérer sur une énumération unique dans l’activité ForEach. Par exemple, si items est un tableau [1, 2, 3], @item() retourne 1 dans la première itération, 2 dans la deuxième, et 3 dans la troisième. Vous pouvez également utiliser @range(0,10) comme expression pour effectuer une itération dix fois commençant par 0 et se terminant par 9.

Itération sur une activité unique

Scénario : Copier à partir du même fichier source dans Stockage Blob Azure vers plusieurs fichiers de destination dans Stockage Blob Azure.

Définition de pipeline

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Définition du jeu de données d'objet blob

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Valeurs de paramètre d’exécution

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Itérer sur plusieurs activités

Il est possible d’itérer sur plusieurs activités (par exemple : des activités web et de copie) dans une activité ForEach. Dans ce scénario, nous vous recommandons d’abstraire plusieurs activités dans un pipeline distinct. Ensuite, vous pouvez utiliser l’activité ExecutePipeline dans le pipeline avec l’activité ForEach pour appeler le pipeline séparé avec plusieurs activités.

Syntaxe

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Exemple

Scénario : Itérer sur un pipeline interne au sein d’une activité ForEach avec l’activité d’exécution du pipeline. Le pipeline interne copie avec des définitions de schéma paramétrées.

Définition du pipeline principal

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Définition de pipeline interne

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Définition du jeu de données de la source

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Définition du jeu de données du récepteur

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Paramètres du pipeline principal

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Agréger des sorties

Pour agréger les sorties de l’activité foreach, utilisez des Variables et l’activité Append Variable.

Commencez par déclarer une arrayvariable dans le pipeline. Puis appelez l'activité Append Variable dans chaque boucle ForEach. Vous pouvez ensuite récupérer l'agrégation à partir de votre tableau.

Limitations et solutions de contournement

Voici quelques limitations de l’activité ForEach et des suggestions de solutions de contournement.

Limitation Solution de contournement
Vous ne pouvez pas imbriquer une boucle ForEach à l’intérieur d’une autre boucle ForEach (ou une boucle Until). Concevez un pipeline à deux niveaux où le pipeline externe contenant la boucle ForEach externe itère sur un pipeline interne contenant la boucle imbriquée.
Pour chaque activité ForEach, batchCount a une valeur maximale de 50 pour le traitement parallèle et un maximum de 100 000 éléments. Concevez un pipeline à deux niveaux où le pipeline externe contenant l’activité ForEach itère sur un pipeline interne.
SetVariable ne peut pas être utilisé dans une activité ForEach qui s’exécute en parallèle, car les variables sont générales pour l’ensemble du pipeline. Elles ne sont pas étendues à une activité ForEach ou à une autre activité. Envisagez d’utiliser ForEach séquentiel ou d’exécuter le pipeline d’exécution dans ForEach (variable/paramètre géré dans le pipeline enfant).

Consultez d’autres activités de flux de contrôle prises en charge :