Share via


ForEach-activiteit in Azure Data Factory en Azure Synapse Analytics

VAN TOEPASSING OP: Azure Data Factory Azure Synapse Analytics

Tip

Probeer Data Factory uit in Microsoft Fabric, een alles-in-één analyseoplossing voor ondernemingen. Microsoft Fabric omvat alles, van gegevensverplaatsing tot gegevenswetenschap, realtime analyses, business intelligence en rapportage. Meer informatie over het gratis starten van een nieuwe proefversie .

De ForEach-activiteit definieert een herhalende controlestroom in een Azure Data Factory- of Synapse-pijplijn. Deze activiteit wordt gebruikt om een verzameling te herhalen en voert opgegeven activiteiten uit in een lus. De lusimplementatie van deze activiteit is vergelijkbaar met Foreach-lusstructuur in computertalen.

Een ForEach-activiteit maken met de gebruikersinterface

Voer de volgende stappen uit om een ForEach-activiteit in een pijplijn te gebruiken:

  1. U kunt elke matrixtypevariabele of uitvoer van andere activiteiten gebruiken als invoer voor uw ForEach-activiteit. Als u een matrixvariabele wilt maken, selecteert u de achtergrond van het pijplijncanvas en selecteert u vervolgens het tabblad Variabelen om een matrixtypevariabele toe te voegen, zoals hieronder wordt weergegeven.

    Toont een leeg pijplijncanvas met een variabele van het matrixtype die aan de pijplijn is toegevoegd.

  2. Zoek naar ForEach in het deelvenster Pijplijnactiviteiten en sleep een ForEach-activiteit naar het pijplijncanvas.

  3. Selecteer de nieuwe ForEach-activiteit op het canvas als deze nog niet is geselecteerd en het tabblad Instellingen om de details ervan te bewerken.

    Toont de gebruikersinterface voor een filteractiviteit.

  4. Selecteer het veld Items en selecteer vervolgens de koppeling Dynamische inhoud toevoegen om het deelvenster dynamische inhoudseditor te openen.

    Toont de   Dynamische inhoud toevoegen  koppeling voor de eigenschap Items.

  5. Selecteer de invoermatrix die u wilt filteren in de dynamische inhoudseditor. In dit voorbeeld selecteren we de variabele die in de eerste stap is gemaakt.

    Toont de dynamische inhoudseditor met de variabele die in de eerste stap is gemaakt

  6. Selecteer de activiteiteneditor in de ForEach-activiteit om een of meer activiteiten toe te voegen die moeten worden uitgevoerd voor elk item in de matrix Invoeritems .

    Toont de knop Activiteiteneditor op de ForEach-activiteit in het venster pijplijneditor.

  7. In alle activiteiten die u in de ForEach-activiteit maakt, kunt u verwijzen naar het huidige item dat de ForEach-activiteit doorloopt vanuit de lijst Items . U kunt naar het huidige item verwijzen waar u een dynamische expressie kunt gebruiken om een eigenschapswaarde op te geven. Selecteer in de dynamische inhoudseditor de ForEach-iterator om het huidige item te retourneren.

    Toont de dynamische inhoudseditor met de ForEach-iterator geselecteerd.

Syntaxis

De eigenschappen worden verderop in dit artikel beschreven. De eigenschap Items is de verzameling en elk item in de verzameling wordt aangeduid met behulp van de @item() volgende syntaxis:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Typeeigenschappen

Eigenschappen Beschrijving Toegestane waarden Vereist
naam Naam van de activiteit voor elke activiteit. String Ja
type Moet zijn ingesteld op ForEach String Ja
isSequentiële Hiermee geeft u op of de lus opeenvolgend of parallel moet worden uitgevoerd. Er kunnen maximaal 50 herhalingen tegelijk worden uitgevoerd). Als u bijvoorbeeld een ForEach-activiteit hebt die door een kopieeractiviteit wordt herhaald met 10 verschillende bron- en sinkgegevenssets met isSequentiële set op False, worden alle kopieën in één keer uitgevoerd. De standaardwaarde is Onwaar.

Als 'isSequential' is ingesteld op False, controleert u of er een juiste configuratie is om meerdere uitvoerbare bestanden uit te voeren. Anders moet deze eigenschap met voorzichtigheid worden gebruikt om schrijfconflicten te voorkomen. Zie de sectie Parallelle uitvoering voor meer informatie.
Booleaanse waarde Nee De standaardwaarde is Onwaar.
batchCount Het aantal batches dat moet worden gebruikt voor het beheren van het aantal parallelle uitvoeringen (wanneer isSequentiële waarde is ingesteld op false). Dit is de bovengrens voor gelijktijdigheid, maar de activiteit voor elke activiteit wordt niet altijd uitgevoerd op dit getal Geheel getal (maximaal 50) Nee De standaardwaarde is 20.
Artikelen Een expressie die een JSON-matrix retourneert die moet worden geïmiteerd. Expressie (die een JSON-matrix retourneert) Ja
Activiteiten De activiteiten die moeten worden uitgevoerd. Lijst met activiteiten Ja

Parallelle uitvoering

Als isSequentiële waarde is ingesteld op onwaar, wordt de activiteit parallel herhaald met maximaal 50 gelijktijdige iteraties. Deze instelling moet voorzichtig worden gebruikt. Als de gelijktijdige iteraties naar dezelfde map maar naar verschillende bestanden schrijven, is deze methode prima. Als de gelijktijdige iteraties gelijktijdig naar hetzelfde bestand worden geschreven, veroorzaakt deze benadering waarschijnlijk een fout.

Iteratie-expressietaal

Geef in de ForEach-activiteit een matrix op die moet worden ge curseerd voor de eigenschapsitems.' Gebruik @item() dit om één opsomming in ForEach-activiteit te herhalen. Als items bijvoorbeeld een matrix is: [1, 2, 3], @item() retourneert 1 in de eerste iteratie, 2 in de tweede iteratie en 3 in de derde iteratie. U kunt ook een dergelijke expressie gebruiken @range(0,10) om tien keer te herhalen vanaf 0 eindigend op 9.

Herhalen van één activiteit

Scenario: Kopieer vanuit hetzelfde bronbestand in Azure Blob naar meerdere doelbestanden in Azure Blob.

Pijplijndefinitie

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Definitie van blobgegevensset

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Parameterwaarden uitvoeren

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Meerdere activiteiten herhalen

Het is mogelijk om meerdere activiteiten (bijvoorbeeld kopieer- en webactiviteiten) in een ForEach-activiteit te herhalen. In dit scenario raden we u aan om meerdere activiteiten in een afzonderlijke pijplijn te abstraheren. Vervolgens kunt u de ExecutePipeline-activiteit in de pijplijn gebruiken met ForEach-activiteit om de afzonderlijke pijplijn aan te roepen met meerdere activiteiten.

Syntaxis

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Opmerking

Scenario: Iterate over an InnerPipeline binnen a ForEach activity with Execute Pipeline activity. De binnenste pijplijn kopieert met schemadefinities die zijn geparameteriseerd.

Hoofdpijplijndefinitie

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Definitie van interne pijplijn

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Definitie van brongegevensset

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Definitie van sinkgegevensset

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Hoofdpijplijnparameters

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Uitvoer samenvoegen

Als u uitvoer van foreach-activiteit wilt aggregeren, gebruikt u variabelen en toevoegvariabeleactiviteit.

Declareer eerst een array variabele in de pijplijn. Roep vervolgens de activiteit Toevoegvariabele in elke foreach-lus aan. Vervolgens kunt u de aggregatie ophalen uit uw matrix.

Beperkingen en oplossingen

Hier volgen enkele beperkingen van de ForEach-activiteit en voorgestelde tijdelijke oplossingen.

Beperking Tijdelijke oplossing
U kunt een ForEach-lus niet nesten in een andere ForEach-lus (of een Until-lus). Ontwerp een pijplijn op twee niveaus waarbij de buitenste pijplijn met de buitenste ForEach-lus wordt herhaald via een binnenpijplijn met de geneste lus.
De ForEach-activiteit heeft maximaal batchCount 50 voor parallelle verwerking en maximaal 100.000 items. Ontwerp een pijplijn met twee niveaus waarbij de buitenste pijplijn met de ForEach-activiteit wordt herhaald over een binnenste pijplijn.
SetVariable kan niet worden gebruikt binnen een ForEach-activiteit die parallel wordt uitgevoerd, omdat de variabelen globaal zijn voor de hele pijplijn, ze zijn niet gericht op een ForEach of een andere activiteit. Overweeg sequentiële ForEach te gebruiken of Execute Pipeline te gebruiken in ForEach (variabele/parameter die wordt verwerkt in onderliggende pijplijn).

Bekijk andere ondersteunde controlestroomactiviteiten: