ForEach-aktivitet i Azure Data Factory och Azure Synapse Analytics
GÄLLER FÖR: Azure Data Factory Azure Synapse Analytics
Dricks
Prova Data Factory i Microsoft Fabric, en allt-i-ett-analyslösning för företag. Microsoft Fabric omfattar allt från dataflytt till datavetenskap, realtidsanalys, business intelligence och rapportering. Lär dig hur du startar en ny utvärderingsversion kostnadsfritt!
ForEach-aktiviteten definierar ett upprepat kontrollflöde i en Azure Data Factory- eller Synapse-pipeline. Den här aktiviteten används till att iterera över en samling och kör angivna aktiviteter i en loop. Implementeringen av loopen för den här aktiviteten liknar Foreach-loopstrukturen i programmeringsspråk.
Skapa en ForEach-aktivitet med användargränssnittet
Utför följande steg för att använda en ForEach-aktivitet i en pipeline:
Du kan använda valfri matristypvariabel eller utdata från andra aktiviteter som indata för din ForEach-aktivitet. Om du vill skapa en matrisvariabel väljer du bakgrunden för pipelinearbetsytan och väljer sedan fliken Variabler för att lägga till en matristypvariabel enligt nedan.
Sök efter ForEach i fönstret Pipelineaktiviteter och dra en ForEach-aktivitet till pipelinearbetsytan.
Välj den nya ForEach-aktiviteten på arbetsytan om den inte redan är markerad och fliken Inställningar för att redigera dess information.
Välj fältet Objekt och välj sedan länken Lägg till dynamiskt innehåll för att öppna fönstret redigerare för dynamiskt innehåll.
Välj den indatamatris som ska filtreras i redigeraren för dynamiskt innehåll. I det här exemplet väljer vi variabeln som skapades i det första steget.
Välj aktivitetsredigeraren i ForEach-aktiviteten för att lägga till en eller flera aktiviteter som ska köras för varje objekt i matrisen för indataobjekt.
I alla aktiviteter som du skapar i ForEach-aktiviteten kan du referera till det aktuella objektet som ForEach-aktiviteten itererar igenom från objektlistan. Du kan referera till det aktuella objektet var som helst där du kan använda ett dynamiskt uttryck för att ange ett egenskapsvärde. I redigeraren för dynamiskt innehåll väljer du ForEach-iteratorn för att returnera det aktuella objektet.
Syntax
Egenskaperna beskrivs senare i den här artikeln. Objektegenskapen är samlingen och varje objekt i samlingen refereras till med hjälp @item()
av det som visas i följande syntax:
{
"name":"MyForEachActivityName",
"type":"ForEach",
"typeProperties":{
"isSequential":"true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
"type": "Expression"
},
"activities":[
{
"name":"MyCopyActivity",
"type":"Copy",
"typeProperties":{
...
},
"inputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@item()"
}
}
]
}
]
}
}
Typegenskaper
Property | beskrivning | Tillåtna värden | Obligatoriskt |
---|---|---|---|
name | Namnet på aktiviteten för varje aktivitet. | String | Ja |
type | Måste anges till ForEach | String | Ja |
isSequential | Anger om loopen ska köras sekventiellt eller parallellt. Maximalt 50 loop-iterationer kan köras samtidigt parallellt). Om du till exempel har en ForEach-aktivitet som itererar över en kopieringsaktivitet med 10 olika käll- och mottagardatauppsättningar med isSequential inställd på False, körs alla kopior samtidigt. Standardvärdet är Falskt. Om "isSequential" är inställt på False kontrollerar du att det finns en korrekt konfiguration för att köra flera körbara filer. I annat fall bör den här egenskapen användas med försiktighet för att undvika skrivkonflikter. Mer information finns i avsnittet Parallell körning . |
Booleskt | Nej. Standardvärdet är Falskt. |
batchCount | Batchantal som ska användas för att kontrollera antalet parallella körningar (när isSequential är inställt på false). Det här är den övre samtidighetsgränsen, men för varje aktivitet körs inte alltid vid det här talet | Heltal (högst 50) | Nej. Standardvärdet är 20. |
Artiklar | Ett uttryck som returnerar en JSON-matris som ska itereras över. | Uttryck (som returnerar en JSON-matris) | Ja |
Aktiviteter | De aktiviteter som ska köras. | Lista över aktiviteter | Ja |
Parallell körning
Om isSequential är inställt på false itereras aktiviteten parallellt med högst 50 samtidiga iterationer. Den här inställningen bör användas med försiktighet. Om samtidiga iterationer skrivs till samma mapp men till olika filer är den här metoden bra. Om samtidiga iterationer skrivs samtidigt till exakt samma fil orsakar den här metoden troligen ett fel.
Iterationsuttrycksspråk
I ForEach-aktiviteten anger du en matris som ska itereras över för egenskapsobjekten." Använd @item()
för att iterera över en enda uppräkning i ForEach-aktivitet. Om objekt till exempel är en matris: [1, 2, 3], @item()
returnerar 1 i den första iterationen, 2 i den andra iterationen och 3 i den tredje iterationen. Du kan också använda @range(0,10)
like-uttryck för att iterera tio gånger från och med 0 som slutar med 9.
Iterera över en enskild aktivitet
Scenario: Kopiera från samma källfil i Azure Blob till flera målfiler i Azure Blob.
Pipelinedefinition
{
"name": "<MyForEachPipeline>",
"properties": {
"activities": [
{
"name": "<MyForEachActivity>",
"type": "ForEach",
"typeProperties": {
"isSequential": "true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPath",
"type": "Expression"
},
"activities": [
{
"name": "MyCopyActivity",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": "false"
},
"sink": {
"type": "BlobSink",
"copyBehavior": "PreserveHierarchy"
}
},
"inputs": [
{
"referenceName": "<MyDataset>",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs": [
{
"referenceName": "MyDataset",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@item()"
}
}
]
}
]
}
}
],
"parameters": {
"mySourceDatasetFolderPath": {
"type": "String"
},
"mySinkDatasetFolderPath": {
"type": "String"
}
}
}
}
Definition av blobdatauppsättning
{
"name":"<MyDataset>",
"properties":{
"type":"AzureBlob",
"typeProperties":{
"folderPath":{
"value":"@dataset().MyFolderPath",
"type":"Expression"
}
},
"linkedServiceName":{
"referenceName":"StorageLinkedService",
"type":"LinkedServiceReference"
},
"parameters":{
"MyFolderPath":{
"type":"String"
}
}
}
}
Kör parametervärden
{
"mySourceDatasetFolderPath": "input/",
"mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}
Iterera över flera aktiviteter
Det går att iterera över flera aktiviteter (till exempel kopiera och webbaktiviteter) i en ForEach-aktivitet. I det här scenariot rekommenderar vi att du abstraherar flera aktiviteter till en separat pipeline. Sedan kan du använda aktiviteten ExecutePipeline i pipelinen med ForEach-aktiviteten för att anropa den separata pipelinen med flera aktiviteter.
Syntax
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "<MyForEachMultipleActivities>"
"typeProperties": {
"isSequential": true,
"items": {
...
},
"activities": [
{
"type": "ExecutePipeline",
"name": "<MyInnerPipeline>"
"typeProperties": {
"pipeline": {
"referenceName": "<copyHttpPipeline>",
"type": "PipelineReference"
},
"parameters": {
...
},
"waitOnCompletion": true
}
}
]
}
}
],
"parameters": {
...
}
}
}
Exempel
Scenario: Iterera över en InnerPipeline inom en ForEach-aktivitet med Aktiviteten Kör pipeline. Den inre pipelinen kopieras med schemadefinitioner parametriserade.
Huvudpipelinedefinition
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "MyForEachActivity",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.inputtables",
"type": "Expression"
},
"activities": [
{
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "InnerCopyPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceTableName": {
"value": "@item().SourceTable",
"type": "Expression"
},
"sourceTableStructure": {
"value": "@item().SourceTableStructure",
"type": "Expression"
},
"sinkTableName": {
"value": "@item().DestTable",
"type": "Expression"
},
"sinkTableStructure": {
"value": "@item().DestTableStructure",
"type": "Expression"
}
},
"waitOnCompletion": true
},
"name": "ExecuteCopyPipeline"
}
]
}
}
],
"parameters": {
"inputtables": {
"type": "Array"
}
}
}
}
Definition av inre pipeline
{
"name": "InnerCopyPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
}
},
"sink": {
"type": "SqlSink"
}
},
"name": "CopyActivity",
"inputs": [
{
"referenceName": "sqlSourceDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sourceTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sourceTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "sqlSinkDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sinkTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sinkTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
]
}
],
"parameters": {
"sourceTableName": {
"type": "String"
},
"sourceTableStructure": {
"type": "String"
},
"sinkTableName": {
"type": "String"
},
"sinkTableStructure": {
"type": "String"
}
}
}
}
Definition av källdatauppsättning
{
"name": "sqlSourceDataset",
"properties": {
"type": "SqlServerTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "sqlserverLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Datauppsättningsdefinition för mottagare
{
"name": "sqlSinkDataSet",
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "azureSqlLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Huvudpipelineparametrar
{
"inputtables": [
{
"SourceTable": "department",
"SourceTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
],
"DestTable": "department2",
"DestTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
]
}
]
}
Aggregera utdata
Om du vill aggregera utdata från foreach-aktivitet använder du aktiviteten Variabler och Tilläggsvariabel .
Deklarera först en array
variabel i pipelinen. Anropa sedan aktiviteten Lägg till variabel i varje foreach-loop. Därefter kan du hämta aggregeringen från matrisen.
Begränsningar och lösningar
Här följer några begränsningar för ForEach-aktiviteten och föreslagna lösningar.
Begränsning | Lösning |
---|---|
Du kan inte kapsla en ForEach-loop i en annan ForEach-loop (eller en Until-loop). | Utforma en pipeline i två nivåer där den yttre pipelinen med den yttre ForEach-loopen itererar över en inre pipeline med den kapslade loopen. |
ForEach-aktiviteten har högst batchCount 50 för parallell bearbetning och högst 100 000 objekt. |
Utforma en pipeline på två nivåer där den yttre pipelinen med ForEach-aktiviteten itererar över en inre pipeline. |
SetVariable kan inte användas i en ForEach-aktivitet som körs parallellt eftersom variablerna är globala för hela pipelinen, de är inte begränsade till en ForEach eller någon annan aktivitet. | Överväg att använda sekventiell ForEach eller använd Kör pipeline i ForEach (variabel/parameter som hanteras i underordnad pipeline). |
Relaterat innehåll
Se andra kontrollflödesaktiviteter som stöds: