Azure Data Factory 和 Azure Synapse Analytics 中的 ForEach 活動
適用於: Azure Data Factory Azure Synapse Analytics
提示
試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的單一分析解決方案。 Microsoft Fabric 涵蓋從數據移動到數據科學、即時分析、商業智慧和報告等所有專案。 瞭解如何 免費啟動新的試用版 !
ForEach 活動會在 Azure Data Factory 或 Synapse 管線中定義重複的控制流程。 此活動用於逐一查看整個集合,然後以迴圈執行指定的活動。 此活動的迴圈實作與程式設計語言中的 Foreach 迴圈結構相似。
使用 UI 建立 ForEach 活動
若要在管線中使用 ForEach 活動,請完成下列步驟:
您可以使用任何數位類型變數或 來自其他活動的 輸出作為 ForEach 活動的輸入。 若要建立數位變數,請選取管線畫布的背景,然後選取 [變數 ] 索引標籤以新增數位類型變數,如下所示。
在管線 [活動] 窗格中搜尋 ForEach,並將 ForEach 活動拖曳至管線畫布。
如果尚未選取,請在畫布上選取新的 ForEach 活動,以及其 設定 索引標籤,以編輯其詳細數據。
選取 [ 專案 ] 字段,然後選取 [新增動態內容 ] 鏈接以開啟動態內容編輯器窗格。
選取要篩選在動態內容編輯器中的輸入數位。 在此範例中,我們會選取在第一個步驟中建立的變數。
選取 ForEach 活動上的 [活動編輯器],為輸入 Items 陣列中的每個專案新增一或多個要執行的活動。
在 ForEach 活動內建立的任何活動中,您可以從 [專案] 列表參考 ForEach 活動正在逐一查看的目前專案。 您可以在任何位置參考目前的專案,您可以使用動態運算式來指定屬性值。 在動態內容編輯器中,選取 ForEach 反覆運算器以傳回目前的專案。
語法
本文稍後會說明這些屬性。 items 屬性是集合,而且集合中的每個專案都是使用 @item()
,如下列語法所示:
{
"name":"MyForEachActivityName",
"type":"ForEach",
"typeProperties":{
"isSequential":"true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
"type": "Expression"
},
"activities":[
{
"name":"MyCopyActivity",
"type":"Copy",
"typeProperties":{
...
},
"inputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@item()"
}
}
]
}
]
}
}
類型屬性
屬性 | 說明 | 允許的值 | 必要 |
---|---|---|---|
NAME | for-each 活動的名稱。 | String | Yes |
type | 必須設定為 ForEach | String | Yes |
isSequential | 指定循環應該循序或平行執行。 最多可以平行執行 50 個循環反覆專案。 例如,如果您有 ForEach 活動逐一查看具有 10 個不同來源和接收數據集 且 isSequential 設定為 False 的複製活動,則會一次執行所有複本。 預設值是 False。 如果 「isSequential」 設定為 False,請確定有正確的組態可執行多個可執行檔。 否則,應謹慎使用這個屬性,以避免產生寫入衝突。 如需詳細資訊,請參閱 平行執行 一節。 |
布林值 | 否。 預設值是 False。 |
batchCount | 用於控制平行執行的批次計數(當 isSequential 設定為 false 時)。 這是上限並行限制,但 for-each 活動不一定會在這個數位上執行 | 整數 (最大值 50) | 否。 預設值為 20。 |
項目 | 傳回要逐一查看之 JSON 陣列的運算式。 | 運算式 (傳回 JSON 陣列) | Yes |
活動 | 要執行的活動。 | 活動清單 | Yes |
平行執行
如果 isSequential 設定為 false,則活動會以最多 50 個並行反覆專案平行反覆運算。 此設定應謹慎使用。 如果並行反覆專案寫入相同的資料夾,但寫入到不同的檔案,這個方法就沒問題了。 如果並行反覆項目同時寫入至完全相同的檔案,這個方法很可能會造成錯誤。
反覆項目表達式語言
在 ForEach 活動中,提供要逐一查看屬性 項目的陣列。」用來 @item()
逐一查看 ForEach 活動中的單一列舉。 例如,如果 專案 是陣列:[1,2,3],在第一個反覆運算中傳回 1, @item()
第二個反覆運算中傳回 2,在第三個反覆運算中傳回 3。 您也可以使用 @range(0,10)
like 運算式,以 0 結束 9 開始反覆運算十次。
逐一查看單一活動
案例: 從 Azure Blob 中的相同來源檔案複製到 Azure Blob 中的多個目的地檔案。
管線定義
{
"name": "<MyForEachPipeline>",
"properties": {
"activities": [
{
"name": "<MyForEachActivity>",
"type": "ForEach",
"typeProperties": {
"isSequential": "true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPath",
"type": "Expression"
},
"activities": [
{
"name": "MyCopyActivity",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": "false"
},
"sink": {
"type": "BlobSink",
"copyBehavior": "PreserveHierarchy"
}
},
"inputs": [
{
"referenceName": "<MyDataset>",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs": [
{
"referenceName": "MyDataset",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@item()"
}
}
]
}
]
}
}
],
"parameters": {
"mySourceDatasetFolderPath": {
"type": "String"
},
"mySinkDatasetFolderPath": {
"type": "String"
}
}
}
}
Blob 資料集定義
{
"name":"<MyDataset>",
"properties":{
"type":"AzureBlob",
"typeProperties":{
"folderPath":{
"value":"@dataset().MyFolderPath",
"type":"Expression"
}
},
"linkedServiceName":{
"referenceName":"StorageLinkedService",
"type":"LinkedServiceReference"
},
"parameters":{
"MyFolderPath":{
"type":"String"
}
}
}
}
執行參數值
{
"mySourceDatasetFolderPath": "input/",
"mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}
逐一查看多個活動
在 ForEach 活動中,可以逐一查看多個活動(例如:複製和 Web 活動)。 在此案例中,我們建議您將多個活動抽象化成個別的管線。 然後,您可以使用 管線中的 ExecutePipeline 活動搭配 ForEach 活動 來叫用具有多個活動的個別管線。
語法
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "<MyForEachMultipleActivities>"
"typeProperties": {
"isSequential": true,
"items": {
...
},
"activities": [
{
"type": "ExecutePipeline",
"name": "<MyInnerPipeline>"
"typeProperties": {
"pipeline": {
"referenceName": "<copyHttpPipeline>",
"type": "PipelineReference"
},
"parameters": {
...
},
"waitOnCompletion": true
}
}
]
}
}
],
"parameters": {
...
}
}
}
範例
案例: 使用執行管線活動逐一查看 ForEach 活動內的 InnerPipeline。 內部管線會以參數化架構定義來複製。
主要管線定義
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "MyForEachActivity",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.inputtables",
"type": "Expression"
},
"activities": [
{
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "InnerCopyPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceTableName": {
"value": "@item().SourceTable",
"type": "Expression"
},
"sourceTableStructure": {
"value": "@item().SourceTableStructure",
"type": "Expression"
},
"sinkTableName": {
"value": "@item().DestTable",
"type": "Expression"
},
"sinkTableStructure": {
"value": "@item().DestTableStructure",
"type": "Expression"
}
},
"waitOnCompletion": true
},
"name": "ExecuteCopyPipeline"
}
]
}
}
],
"parameters": {
"inputtables": {
"type": "Array"
}
}
}
}
內部管線定義
{
"name": "InnerCopyPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
}
},
"sink": {
"type": "SqlSink"
}
},
"name": "CopyActivity",
"inputs": [
{
"referenceName": "sqlSourceDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sourceTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sourceTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "sqlSinkDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sinkTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sinkTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
]
}
],
"parameters": {
"sourceTableName": {
"type": "String"
},
"sourceTableStructure": {
"type": "String"
},
"sinkTableName": {
"type": "String"
},
"sinkTableStructure": {
"type": "String"
}
}
}
}
來源數據集定義
{
"name": "sqlSourceDataset",
"properties": {
"type": "SqlServerTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "sqlserverLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
接收數據集定義
{
"name": "sqlSinkDataSet",
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "azureSqlLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
主要管線參數
{
"inputtables": [
{
"SourceTable": "department",
"SourceTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
],
"DestTable": "department2",
"DestTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
]
}
]
}
匯總輸出
若要匯總 foreach 活動的輸出,請利用變數和附加變數活動。
首先,在管線中宣告 array
變數 。 然後,在每個 foreach 循環內叫用 Append Variable 活動。 接著,您可以從陣列擷取匯總。
限制和因應措施
以下是 ForEach 活動的一些限制和建議的因應措施。
限制 | 因應措施 |
---|---|
您無法將 ForEach 循環巢狀於另一個 ForEach 循環內(或 Until 迴圈)。 | 設計兩層管線,其中具有外部 ForEach 循環的外部管線會逐一查看具有巢狀循環的內部管線。 |
ForEach 活動最多 batchCount 50 個用於平行處理,最多 100,000 個專案。 |
設計兩層管線,其中外部管線與 ForEach 活動逐一查看內部管線。 |
SetVariable 無法在平行執行的 ForEach 活動內使用,因為變數是全域至整個管線,它們不會限定為 ForEach 或任何其他活動。 | 請考慮使用循序 ForEach,或在 ForEach 內使用執行管線(在子管線中處理的變數/參數)。 |
相關內容
請參閱其他支援的控制流程活動: