Azure Data Factory 和 Azure Synapse Analytics 中的 ForEach 活動
適用於:Azure Data Factory Azure Synapse Analytics
提示
試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用!
ForEach 活動會在 Azure Data Factory 或 Synapse 管線中定義重複的控制流程。 此活動用於逐一查看整個集合,然後以迴圈執行指定的活動。 此活動的迴圈實作與程式設計語言中的 Foreach 迴圈結構相似。
使用 UI 建立 ForEach 活動
若要在管線中使用 ForEach 活動,請完成下列步驟:
您可以使用任何陣列類型變數或來自其他活動的輸出作為 ForEach 活動的輸入。 若要建立陣列變數,請選取管線畫布的背景,然後選取 [變數] 索引標籤,以新增陣列類型變數,如下所示。
在管線 [活動] 窗格中搜尋 ForEach,然後將 ForEach 活動拖曳至管線畫布。
在畫布上選取新的 ForEach 活動 (如未選取) 和其 [設定] 索引標籤以編輯詳細資料。
選取 [項目] 欄位,然後選取 [新增動態內容] 連結以開啟動態內容編輯器窗格。
選取您要在動態內容編輯器中篩選的輸入陣列。 在此範例中,我們會選取在第一個步驟中建立的變數。
選取 ForEach 活動上的活動編輯器,為輸入項目陣列中的每個項目新增一或多個要執行的活動。
在您於 ForEach 活動內建立的任何活動中,您可以參考 ForEach 活動從項目清單中逐一查看的目前項目。 您可以在使用動態運算式來指定屬性值的任何位置參考目前的項目。 在動態內容編輯器中,選取 ForEach 迭代器來傳回目前的項目。
語法
本文稍後將說明這些屬性。 項目屬性為集合,而集合中的每個項目則使用 @item()
表示,如下列語法所示:
{
"name":"MyForEachActivityName",
"type":"ForEach",
"typeProperties":{
"isSequential":"true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
"type": "Expression"
},
"activities":[
{
"name":"MyCopyActivity",
"type":"Copy",
"typeProperties":{
...
},
"inputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@item()"
}
}
]
}
]
}
}
類型屬性
屬性 | 說明 | 允許的值 | 必要 |
---|---|---|---|
NAME | for-each 活動的名稱。 | String | Yes |
type | 必須設定為 ForEach | String | Yes |
isSequential | 指定應該循序或以平行方式執行迴圈。 以平行方式可一次執行最多 50 個迴圈反覆項目。 例如,如果您的 ForEach 活動會反覆查詢 10 個不同來源和接收資料集的複製活動,且 isSequential 設為 False,則所有複本會都執行一次。 預設值是 False。 如果 isSequential 設定為 False,請確認有正確的設定可執行多個可執行檔。 否則,應謹慎使用這個屬性,以避免引發寫入衝突。 如需詳細資訊,請參閱平行執行一節。 |
布林值 | 否。 預設值是 False。 |
batchCount | 批次計數,用於控制平行執行的數目 (當 isSequential 設定為 false 時)。 這是並行上限,但 for-each 活動不一定會以此數目執行 | 整數 (最大值 50) | 否。 預設為 20。 |
項目 | 傳回要反覆查詢之 JSON 陣列的運算式。 | 運算式 (傳回 JSON 陣列) | Yes |
活動 | 要執行的活動。 | 活動清單 | Yes |
平行執行
如果 isSequential 設為 false,活動會以平行方式逐一查看,並行的反覆項目數最多為 50。 此設定應謹慎使用。 如果並行的反覆項目會寫入相同資料夾的不同檔案,這種方法是正常的。 如果並行的反覆項目會同時寫入相同的檔案,這種方法很可能會導致錯誤。
反覆項目運算式語言
在 ForEach 活動中,為屬性項目提供可反覆查詢的陣列。使用 @item()
反覆查詢 ForEach 活動中的單一列舉。 例如,如果項目是陣列:[1, 2, 3],@item()
在第一個反覆項目中會傳回 1,在第二個反覆項目中會傳回 2,在第三個反覆項目中會傳回 3。 您也可以使用 @range(0,10)
之類的運算式,逐一查看十個項目,以 0 開始以 9 結束。
反覆查詢單一活動
案例:從 Azure Blob 中相同的來源檔案複製到 Azure Blob 中的多個目的地檔案。
管線定義
{
"name": "<MyForEachPipeline>",
"properties": {
"activities": [
{
"name": "<MyForEachActivity>",
"type": "ForEach",
"typeProperties": {
"isSequential": "true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPath",
"type": "Expression"
},
"activities": [
{
"name": "MyCopyActivity",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": "false"
},
"sink": {
"type": "BlobSink",
"copyBehavior": "PreserveHierarchy"
}
},
"inputs": [
{
"referenceName": "<MyDataset>",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs": [
{
"referenceName": "MyDataset",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@item()"
}
}
]
}
]
}
}
],
"parameters": {
"mySourceDatasetFolderPath": {
"type": "String"
},
"mySinkDatasetFolderPath": {
"type": "String"
}
}
}
}
Blob 資料集定義
{
"name":"<MyDataset>",
"properties":{
"type":"AzureBlob",
"typeProperties":{
"folderPath":{
"value":"@dataset().MyFolderPath",
"type":"Expression"
}
},
"linkedServiceName":{
"referenceName":"StorageLinkedService",
"type":"LinkedServiceReference"
},
"parameters":{
"MyFolderPath":{
"type":"String"
}
}
}
}
執行參數值
{
"mySourceDatasetFolderPath": "input/",
"mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}
反覆查詢多個活動
可反覆查詢 ForEach 活動中的多個活動 (例如:複製和 Web 活動)。 在此案例中,我們建議您將多個活動提取至不同的管線。 然後,您可以在管線中使用 ExecutePipeline 活動,以 ForEach 活動叫用不同的管線來搭配多個活動。
語法
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "<MyForEachMultipleActivities>"
"typeProperties": {
"isSequential": true,
"items": {
...
},
"activities": [
{
"type": "ExecutePipeline",
"name": "<MyInnerPipeline>"
"typeProperties": {
"pipeline": {
"referenceName": "<copyHttpPipeline>",
"type": "PipelineReference"
},
"parameters": {
...
},
"waitOnCompletion": true
}
}
]
}
}
],
"parameters": {
...
}
}
}
範例
案例:使用執行管線活動反覆查詢 ForEach 活動內的 InnerPipeline。 內部管線使用參數化的結構描述定義複製。
主要管線定義
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "MyForEachActivity",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.inputtables",
"type": "Expression"
},
"activities": [
{
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "InnerCopyPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceTableName": {
"value": "@item().SourceTable",
"type": "Expression"
},
"sourceTableStructure": {
"value": "@item().SourceTableStructure",
"type": "Expression"
},
"sinkTableName": {
"value": "@item().DestTable",
"type": "Expression"
},
"sinkTableStructure": {
"value": "@item().DestTableStructure",
"type": "Expression"
}
},
"waitOnCompletion": true
},
"name": "ExecuteCopyPipeline"
}
]
}
}
],
"parameters": {
"inputtables": {
"type": "Array"
}
}
}
}
內部管線定義
{
"name": "InnerCopyPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
}
},
"sink": {
"type": "SqlSink"
}
},
"name": "CopyActivity",
"inputs": [
{
"referenceName": "sqlSourceDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sourceTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sourceTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "sqlSinkDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sinkTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sinkTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
]
}
],
"parameters": {
"sourceTableName": {
"type": "String"
},
"sourceTableStructure": {
"type": "String"
},
"sinkTableName": {
"type": "String"
},
"sinkTableStructure": {
"type": "String"
}
}
}
}
來源資料集定義
{
"name": "sqlSourceDataset",
"properties": {
"type": "SqlServerTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "sqlserverLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
接收資料集定義
{
"name": "sqlSinkDataSet",
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "azureSqlLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
主要管線參數
{
"inputtables": [
{
"SourceTable": "department",
"SourceTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
],
"DestTable": "department2",
"DestTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
]
}
]
}
彙總輸出
若要彙總 foreach 活動的輸出,請利用 Variable 與 Append Variable 活動。
首先,在管線中宣告 array
variable。 然後在每個 foreach 迴圈內叫用 Append Variable 活動。 接著,也可以從您的陣列中擷取彙總。
限制和因應措施
以下是 ForEach 活動和建議因應措施的一些限制。
限制 | 因應措施 |
---|---|
您無法將 ForEach 迴圈內嵌在其他 ForEach 迴圈 (或 Until 迴圈) 內。 | 設計兩個層級的管線,其中外部管線具有外部 ForEach 迴圈,會逐一查看具有巢狀迴圈的內部管線。 |
ForEach 活動的平行處理 batchCount 上限為 50,最多可處理 100,000 個項目。 |
設計兩個層級的管線,其中外部管線具有 ForEach 活動,會逐一查看內部管線。 |
SetVariable 無法在平行執行的 ForEach 活動內使用,因為變數是整個管線的全域變數,其範圍不會限定為 ForEach 或任何其他活動。 | 請考慮使用循序 ForEach,或在 ForEach 內使用執行管線 (子管線中處理的變數/參數)。 |
相關內容
查看其他支援的控制流程活動: