Aktivita ForEach ve službě Azure Data Factory a Azure Synapse Analytics

PLATÍ PRO: Azure Data Factory Azure Synapse Analytics

Tip

Vyzkoušejte si službu Data Factory v Microsoft Fabric, řešení pro analýzy typu all-in-one pro podniky. Microsoft Fabric zahrnuje všechno od přesunu dat až po datové vědy, analýzy v reálném čase, business intelligence a vytváření sestav. Přečtěte si, jak začít používat novou zkušební verzi zdarma.

Aktivita ForEach definuje opakující se tok řízení v kanálu Azure Data Factory nebo Synapse. Tato aktivita se používá k opakování v kolekci a spouští zadané aktivity ve smyčce. Implementace smyčky této aktivity se podobá struktuře smyčky Foreach v programovacích jazycích.

Vytvoření aktivity ForEach pomocí uživatelského rozhraní

Pokud chcete v kanálu použít aktivitu ForEach, proveďte následující kroky:

  1. Jako vstup pro aktivitu ForEach můžete použít libovolnou proměnnou typu pole nebo výstupy z jiných aktivit . Pokud chcete vytvořit maticovou proměnnou, vyberte pozadí plátna kanálu a pak výběrem karty Proměnné přidejte proměnnou typu pole, jak je znázorněno níže.

    Shows an empty pipeline canvas with an array type variable added to the pipeline.

  2. Vyhledejte forEach v podokně Aktivity kanálu a přetáhněte aktivitu ForEach na plátno kanálu.

  3. Vyberte novou aktivitu ForEach na plátně, pokud ještě není vybraná, a její Nastavení kartu pro úpravu podrobností.

    Shows the UI for a Filter activity.

  4. Vyberte pole Položky a pak výběrem odkazu Přidat dynamický obsah otevřete podokno editoru dynamického obsahu.

    Shows the  Add dynamic content  link for the Items property.

  5. Vyberte vstupní pole, které chcete filtrovat v editoru dynamického obsahu. V tomto příkladu vybereme proměnnou vytvořenou v prvním kroku.

    Shows the dynamic content editor with the variable created in the first step selected

  6. Vyberte editor aktivit v aktivitě ForEach a přidejte jednu nebo více aktivit, které se mají spustit pro každou položku ve vstupním poli Položky .

    Shows the Activities editor button on the ForEach activity in the pipeline editor window.

  7. Ve všech aktivitách, které vytvoříte v rámci aktivity ForEach, můžete odkazovat na aktuální položku, kterou aktivita ForEach iteruje ze seznamu Položek . Na aktuální položku můžete odkazovat kdekoli, kde můžete zadat hodnotu vlastnosti pomocí dynamického výrazu. V editoru dynamického obsahu vyberte iterátor ForEach a vraťte aktuální položku.

    Shows the dynamic content editor with the ForEach iterator selected.

Syntaxe

Vlastnosti jsou popsány dále v tomto článku. Vlastnost items je kolekce a každá položka v kolekci je označována pomocí @item() , jak je znázorněno v následující syntaxi:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Vlastnosti typu

Vlastnost Popis Povolené hodnoty Požaduje se
name Název jednotlivých aktivit. String Ano
type Musí být nastavená hodnota ForEach. String Ano
isSequential Určuje, jestli se má smyčka spouštět postupně nebo paralelně. Paralelně lze spustit maximálně 50 iterací smyčky najednou). Pokud máte například aktivitu ForEach, která iteruje aktivitu kopírování s 10 různými zdrojovými datovými sadami a datovými sadami jímky s hodnotou IsSequential nastavenou na Hodnotu False, spustí se všechny kopie najednou. Výchozí hodnota je False.

Pokud je hodnota isSequential nastavená na False, ujistěte se, že existuje správná konfigurace pro spouštění více spustitelných souborů. Jinak by tato vlastnost měla být použita s opatrností, aby nedocházelo ke konfliktům zápisu. Další informace naleznete v části Paralelní spuštění .
Logická hodnota Ne. Výchozí hodnota je False.
batchCount Počet dávek, který se má použít pro řízení počtu paralelního spuštění (pokud je hodnota isSequential nastavená na hodnotu false). Jedná se o horní limit souběžnosti, ale u každé aktivity se nespustí vždy na tomto čísle. Celé číslo (maximálně 50) Ne. Výchozí hodnota je 20.
Items Výraz, který vrátí pole JSON, které se má iterated převést. Výraz (který vrací pole JSON) Ano
Aktivity Aktivity, které se mají provést. Seznam aktivit Ano

Paralelní spouštění

Pokud je hodnota isSequential nastavena na false, aktivita iteruje paralelně s maximálně 50 souběžnými iteracemi. Toto nastavení by se mělo používat s opatrností. Pokud souběžné iterace zapisují do stejné složky, ale do různých souborů, je tento přístup v pořádku. Pokud souběžné iterace zapisují souběžně do stejného souboru, pravděpodobně tento přístup způsobí chybu.

Jazyk výrazů iterace

V aktivitě ForEach zadejte pole, které má být iterated pro položky vlastnosti." Slouží @item() k iteraci jednoho výčtu v aktivitě ForEach. Pokud jsou například položky matice: [1, 2, 3], @item() vrátí hodnotu 1 v první iteraci, 2 ve druhé iteraci a 3 ve třetí iteraci. Můžete také použít @range(0,10) podobný výraz k iteraci desetkrát od 0 do 9.

Iterace přes jednu aktivitu

Scénář: Zkopírujte ze stejného zdrojového souboru v Objektu blob Azure do více cílových souborů v Objektu blob Azure.

Definice kanálu

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Definice datové sady objektů blob

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Spuštění hodnot parametrů

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Iterace více aktivit

V aktivitě ForEach je možné iterovat více aktivit (například kopírování a webové aktivity). V tomto scénáři doporučujeme abstrahovat více aktivit do samostatného kanálu. Potom můžete použít aktivitu ExecutePipeline v kanálu s aktivitou ForEach k vyvolání samostatného kanálu s více aktivitami.

Syntaxe

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Příklad

Scénář: Iterace přes InnerPipeline v rámci aktivity ForEach s aktivitou Execute Pipeline. Vnitřní kanál kopíruje s parametrizovanými definicemi schématu.

Definice hlavního kanálu

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Definice vnitřního kanálu

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Definice zdrojové datové sady

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Definice datové sady jímky

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Parametry hlavního kanálu

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Agregace výstupů

Pokud chcete agregovat výstupy aktivity foreach, využijte proměnné a aktivitu přidávací proměnné.

Nejprve deklarujte proměnnou array v kanálu. Potom v každé smyčce foreach vyvoláte aktivitu přidávací proměnné. Následně můžete agregaci načíst z pole.

Omezení a zástupná řešení

Tady jsou některá omezení aktivity ForEach a navrhovaná alternativní řešení.

Omezení Alternativní řešení
Smyčku ForEach nelze vnořit do jiné smyčky ForEach (nebo do smyčky Until). Navrhňte dvouúrovňový kanál, ve kterém vnější kanál s vnější smyčkou ForEach iteruje vnitřní kanál s vnořenou smyčkou.
Aktivita ForEach má maximálně batchCount 50 pro paralelní zpracování a maximálně 100 000 položek. Navrhňte dvouúrovňový kanál, ve kterém se vnější kanál s aktivitou ForEach iteruje přes vnitřní kanál.
SetVariable nelze použít uvnitř aktivity ForEach, která běží paralelně, protože proměnné jsou globální pro celý kanál, nejsou vymezeny na ForEach ani na žádnou jinou aktivitu. Zvažte použití sekvenčního příkazu ForEach nebo použití spuštění kanálu uvnitř forEach (proměnná nebo parametr zpracovaný v podřízené kanálu).

Projděte si další podporované aktivity toku řízení: