Azure Data Factory 和 Azure Synapse Analytics 中的 ForEach 活動

適用於: Azure Data Factory Azure Synapse Analytics

提示

試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的單一分析解決方案。 Microsoft Fabric 涵蓋從數據移動到數據科學、即時分析、商業智慧和報告等所有專案。 瞭解如何 免費啟動新的試用版

ForEach 活動會在 Azure Data Factory 或 Synapse 管線中定義重複的控制流程。 此活動用於逐一查看整個集合,然後以迴圈執行指定的活動。 此活動的迴圈實作與程式設計語言中的 Foreach 迴圈結構相似。

使用 UI 建立 ForEach 活動

若要在管線中使用 ForEach 活動,請完成下列步驟:

  1. 您可以使用任何數位類型變數或 來自其他活動的 輸出作為 ForEach 活動的輸入。 若要建立數位變數,請選取管線畫布的背景,然後選取 [變數 ] 索引標籤以新增數位類型變數,如下所示。

    Shows an empty pipeline canvas with an array type variable added to the pipeline.

  2. 在管線 [活動] 窗格中搜尋 ForEach,並將 ForEach 活動拖曳至管線畫布。

  3. 如果尚未選取,請在畫布上選取新的 ForEach 活動,以及其 設定 索引標籤,以編輯其詳細數據。

    Shows the UI for a Filter activity.

  4. 選取 [ 專案 ] 字段,然後選取 [新增動態內容 ] 鏈接以開啟動態內容編輯器窗格。

    Shows the  Add dynamic content  link for the Items property.

  5. 選取要篩選在動態內容編輯器中的輸入數位。 在此範例中,我們會選取在第一個步驟中建立的變數。

    Shows the dynamic content editor with the variable created in the first step selected

  6. 選取 ForEach 活動上的 [活動編輯器],為輸入 Items 陣列中的每個專案新增一或多個要執行的活動。

    Shows the Activities editor button on the ForEach activity in the pipeline editor window.

  7. 在 ForEach 活動內建立的任何活動中,您可以從 [專案] 列表參考 ForEach 活動正在逐一查看的目前專案。 您可以在任何位置參考目前的專案,您可以使用動態運算式來指定屬性值。 在動態內容編輯器中,選取 ForEach 反覆運算器以傳回目前的專案。

    Shows the dynamic content editor with the ForEach iterator selected.

語法

本文稍後會說明這些屬性。 items 屬性是集合,而且集合中的每個專案都是使用 @item() ,如下列語法所示:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

類型屬性

屬性 說明 允許的值 必要
NAME for-each 活動的名稱。 String Yes
type 必須設定為 ForEach String Yes
isSequential 指定循環應該循序或平行執行。 最多可以平行執行 50 個循環反覆專案。 例如,如果您有 ForEach 活動逐一查看具有 10 個不同來源和接收數據集 且 isSequential 設定為 False 的複製活動,則會一次執行所有複本。 預設值是 False。

如果 「isSequential」 設定為 False,請確定有正確的組態可執行多個可執行檔。 否則,應謹慎使用這個屬性,以避免產生寫入衝突。 如需詳細資訊,請參閱 平行執行 一節。
布林值 否。 預設值是 False。
batchCount 用於控制平行執行的批次計數(當 isSequential 設定為 false 時)。 這是上限並行限制,但 for-each 活動不一定會在這個數位上執行 整數 (最大值 50) 否。 預設值為 20。
項目 傳回要逐一查看之 JSON 陣列的運算式。 運算式 (傳回 JSON 陣列) Yes
活動 要執行的活動。 活動清單 Yes

平行執行

如果 isSequential 設定為 false,則活動會以最多 50 個並行反覆專案平行反覆運算。 此設定應謹慎使用。 如果並行反覆專案寫入相同的資料夾,但寫入到不同的檔案,這個方法就沒問題了。 如果並行反覆項目同時寫入至完全相同的檔案,這個方法很可能會造成錯誤。

反覆項目表達式語言

在 ForEach 活動中,提供要逐一查看屬性 項目的陣列。」用來 @item() 逐一查看 ForEach 活動中的單一列舉。 例如,如果 專案 是陣列:[1,2,3],在第一個反覆運算中傳回 1, @item() 第二個反覆運算中傳回 2,在第三個反覆運算中傳回 3。 您也可以使用 @range(0,10) like 運算式,以 0 結束 9 開始反覆運算十次。

逐一查看單一活動

案例: 從 Azure Blob 中的相同來源檔案複製到 Azure Blob 中的多個目的地檔案。

管線定義

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Blob 資料集定義

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

執行參數值

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

逐一查看多個活動

在 ForEach 活動中,可以逐一查看多個活動(例如:複製和 Web 活動)。 在此案例中,我們建議您將多個活動抽象化成個別的管線。 然後,您可以使用 管線中的 ExecutePipeline 活動搭配 ForEach 活動 來叫用具有多個活動的個別管線。

語法

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

範例

案例: 使用執行管線活動逐一查看 ForEach 活動內的 InnerPipeline。 內部管線會以參數化架構定義來複製。

主要管線定義

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

內部管線定義

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

來源數據集定義

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

接收數據集定義

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

主要管線參數

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

匯總輸出

若要匯總 foreach 活動的輸出,請利用變數附加變數活動。

首先,在管線中宣告 array變數 。 然後,在每個 foreach 循環內用 Append Variable 活動。 接著,您可以從陣列擷取匯總。

限制和因應措施

以下是 ForEach 活動的一些限制和建議的因應措施。

限制 因應措施
您無法將 ForEach 循環巢狀於另一個 ForEach 循環內(或 Until 迴圈)。 設計兩層管線,其中具有外部 ForEach 循環的外部管線會逐一查看具有巢狀循環的內部管線。
ForEach 活動最多 batchCount 50 個用於平行處理,最多 100,000 個專案。 設計兩層管線,其中外部管線與 ForEach 活動逐一查看內部管線。
SetVariable 無法在平行執行的 ForEach 活動內使用,因為變數是全域至整個管線,它們不會限定為 ForEach 或任何其他活動。 請考慮使用循序 ForEach,或在 ForEach 內使用執行管線(在子管線中處理的變數/參數)。

請參閱其他支援的控制流程活動: