ForEach-aktivitet i Azure Data Factory och Azure Synapse Analytics

GÄLLER FÖR: Azure Data Factory Azure Synapse Analytics

Dricks

Prova Data Factory i Microsoft Fabric, en allt-i-ett-analyslösning för företag. Microsoft Fabric omfattar allt från dataflytt till datavetenskap, realtidsanalys, business intelligence och rapportering. Lär dig hur du startar en ny utvärderingsversion kostnadsfritt!

ForEach-aktiviteten definierar ett upprepat kontrollflöde i en Azure Data Factory- eller Synapse-pipeline. Den här aktiviteten används till att iterera över en samling och kör angivna aktiviteter i en loop. Implementeringen av loopen för den här aktiviteten liknar Foreach-loopstrukturen i programmeringsspråk.

Skapa en ForEach-aktivitet med användargränssnittet

Utför följande steg för att använda en ForEach-aktivitet i en pipeline:

  1. Du kan använda valfri matristypvariabel eller utdata från andra aktiviteter som indata för din ForEach-aktivitet. Om du vill skapa en matrisvariabel väljer du bakgrunden för pipelinearbetsytan och väljer sedan fliken Variabler för att lägga till en matristypvariabel enligt nedan.

    Shows an empty pipeline canvas with an array type variable added to the pipeline.

  2. Sök efter ForEach i fönstret Pipelineaktiviteter och dra en ForEach-aktivitet till pipelinearbetsytan.

  3. Välj den nya ForEach-aktiviteten på arbetsytan om den inte redan är markerad och fliken Inställningar för att redigera dess information.

    Shows the UI for a Filter activity.

  4. Välj fältet Objekt och välj sedan länken Lägg till dynamiskt innehåll för att öppna fönstret redigerare för dynamiskt innehåll.

    Shows the  Add dynamic content  link for the Items property.

  5. Välj den indatamatris som ska filtreras i redigeraren för dynamiskt innehåll. I det här exemplet väljer vi variabeln som skapades i det första steget.

    Shows the dynamic content editor with the variable created in the first step selected

  6. Välj aktivitetsredigeraren i ForEach-aktiviteten för att lägga till en eller flera aktiviteter som ska köras för varje objekt i matrisen för indataobjekt.

    Shows the Activities editor button on the ForEach activity in the pipeline editor window.

  7. I alla aktiviteter som du skapar i ForEach-aktiviteten kan du referera till det aktuella objektet som ForEach-aktiviteten itererar igenom från objektlistan. Du kan referera till det aktuella objektet var som helst där du kan använda ett dynamiskt uttryck för att ange ett egenskapsvärde. I redigeraren för dynamiskt innehåll väljer du ForEach-iteratorn för att returnera det aktuella objektet.

    Shows the dynamic content editor with the ForEach iterator selected.

Syntax

Egenskaperna beskrivs senare i den här artikeln. Objektegenskapen är samlingen och varje objekt i samlingen refereras till med hjälp @item() av det som visas i följande syntax:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Typegenskaper

Property beskrivning Tillåtna värden Obligatoriskt
name Namnet på aktiviteten för varje aktivitet. String Ja
type Måste anges till ForEach String Ja
isSequential Anger om loopen ska köras sekventiellt eller parallellt. Maximalt 50 loop-iterationer kan köras samtidigt parallellt). Om du till exempel har en ForEach-aktivitet som itererar över en kopieringsaktivitet med 10 olika käll- och mottagardatauppsättningar med isSequential inställd på False, körs alla kopior samtidigt. Standardvärdet är Falskt.

Om "isSequential" är inställt på False kontrollerar du att det finns en korrekt konfiguration för att köra flera körbara filer. I annat fall bör den här egenskapen användas med försiktighet för att undvika skrivkonflikter. Mer information finns i avsnittet Parallell körning .
Booleskt Nej. Standardvärdet är Falskt.
batchCount Batchantal som ska användas för att kontrollera antalet parallella körningar (när isSequential är inställt på false). Det här är den övre samtidighetsgränsen, men för varje aktivitet körs inte alltid vid det här talet Heltal (högst 50) Nej. Standardvärdet är 20.
Artiklar Ett uttryck som returnerar en JSON-matris som ska itereras över. Uttryck (som returnerar en JSON-matris) Ja
Aktiviteter De aktiviteter som ska köras. Lista över aktiviteter Ja

Parallell körning

Om isSequential är inställt på false itereras aktiviteten parallellt med högst 50 samtidiga iterationer. Den här inställningen bör användas med försiktighet. Om samtidiga iterationer skrivs till samma mapp men till olika filer är den här metoden bra. Om samtidiga iterationer skrivs samtidigt till exakt samma fil orsakar den här metoden troligen ett fel.

Iterationsuttrycksspråk

I ForEach-aktiviteten anger du en matris som ska itereras över för egenskapsobjekten." Använd @item() för att iterera över en enda uppräkning i ForEach-aktivitet. Om objekt till exempel är en matris: [1, 2, 3], @item() returnerar 1 i den första iterationen, 2 i den andra iterationen och 3 i den tredje iterationen. Du kan också använda @range(0,10) like-uttryck för att iterera tio gånger från och med 0 som slutar med 9.

Iterera över en enskild aktivitet

Scenario: Kopiera från samma källfil i Azure Blob till flera målfiler i Azure Blob.

Pipelinedefinition

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Definition av blobdatauppsättning

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Kör parametervärden

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Iterera över flera aktiviteter

Det går att iterera över flera aktiviteter (till exempel kopiera och webbaktiviteter) i en ForEach-aktivitet. I det här scenariot rekommenderar vi att du abstraherar flera aktiviteter till en separat pipeline. Sedan kan du använda aktiviteten ExecutePipeline i pipelinen med ForEach-aktiviteten för att anropa den separata pipelinen med flera aktiviteter.

Syntax

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Exempel

Scenario: Iterera över en InnerPipeline inom en ForEach-aktivitet med Aktiviteten Kör pipeline. Den inre pipelinen kopieras med schemadefinitioner parametriserade.

Huvudpipelinedefinition

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Definition av inre pipeline

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Definition av källdatauppsättning

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Datauppsättningsdefinition för mottagare

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Huvudpipelineparametrar

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Aggregera utdata

Om du vill aggregera utdata från foreach-aktivitet använder du aktiviteten Variabler och Tilläggsvariabel .

Deklarera först en arrayvariabel i pipelinen. Anropa sedan aktiviteten Lägg till variabel i varje foreach-loop. Därefter kan du hämta aggregeringen från matrisen.

Begränsningar och lösningar

Här följer några begränsningar för ForEach-aktiviteten och föreslagna lösningar.

Begränsning Lösning
Du kan inte kapsla en ForEach-loop i en annan ForEach-loop (eller en Until-loop). Utforma en pipeline i två nivåer där den yttre pipelinen med den yttre ForEach-loopen itererar över en inre pipeline med den kapslade loopen.
ForEach-aktiviteten har högst batchCount 50 för parallell bearbetning och högst 100 000 objekt. Utforma en pipeline på två nivåer där den yttre pipelinen med ForEach-aktiviteten itererar över en inre pipeline.
SetVariable kan inte användas i en ForEach-aktivitet som körs parallellt eftersom variablerna är globala för hela pipelinen, de är inte begränsade till en ForEach eller någon annan aktivitet. Överväg att använda sekventiell ForEach eller använd Kör pipeline i ForEach (variabel/parameter som hanteras i underordnad pipeline).

Se andra kontrollflödesaktiviteter som stöds: