ForEach activities iteration implementation in parallel?

Andris Veidemanis 55 Reputation points
2023-07-12T16:13:08.7066667+00:00

Hello,

Maybe this have changed, but I'm remembering that ADF used to iterate over array a bit differently then it does now.
Example pipeline:

{
    "name": "ForEach",
    "properties": {
        "activities": [
            {
                "name": "ForEach",
                "type": "ForEach",
                "dependsOn": [],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@variables('v_foreach')",
                        "type": "Expression"
                    },
                    "isSequential": false,
                    "batchCount": 3,
                    "activities": [
                        {
                            "name": "Wait",
                            "type": "Wait",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "waitTimeInSeconds": {
                                    "value": "@item().WaitTime",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            }
        ],
        "variables": {
            "v_foreach": {
                "type": "Array",
                "defaultValue": [
                    {
                        "No": 1,
                        "WaitTime": 60
                    },
                    {
                        "No": 2,
                        "WaitTime": 1
                    },
                    {
                        "No": 3,
                        "WaitTime": 1
                    },
                    {
                        "No": 4,
                        "WaitTime": 60
                    },
                    {
                        "No": 5,
                        "WaitTime": 1
                    },
                    {
                        "No": 6,
                        "WaitTime": 1
                    },
                    {
                        "No": 7,
                        "WaitTime": 60
                    }
                ]
            }
        },
        "annotations": []
    }
}

In my mind this pipeline should run 63 seconds (removing ADF pipeline spin up time and such):

Time Slot1 Slot2 Slot3
0 sec "No": 1, "WaitTime: 60" "No": 2, "WaitTime": 1 "No": 3, "WaitTime": 1
1 sec "No": 1, "WaitTime": 59 "No": 4, "WaitTime": 60 "No": 5, "WaitTime": 1
2 sec "No": 1, "WaitTime": 58 "No": 4, "WaitTime": 59 "No": 6, "WaitTime": 1
3 sec "No": 1, "WaitTime": 57 "No": 4, "WaitTime": 58 "No": 7, "WaitTime": 60
... ... ... ...
62 sec <empty> <empty> "No": 7, "WaitTime": 1
63 sec <empty> <empty> <empty>

But in reality it is like this:

Time Slot1 Slot2 Slot3
0 sec "No": 1, "WaitTime": 60 "No": 2, "WaitTime": 1 "No": 3, "WaitTime": 1
1 sec "No": 1, "WaitTime": 59 "No": 5, "WaitTime": 1 "No": 6, "WaitTime": 1
2 sec "No": 1, "WaitTime": 58 <empty> <empty>
... ... ... ...
60 "No": 4, "WaitTime": 60 <empty> <empty>
... ... ... ...
120 "No": 7, "WaitTime": 60 <empty> <empty>
... ... ... ...
179 "No": 7, "WaitTime": 1 <empty> <empty>
180 <empty> <empty> <empty>

The time difference in this case is 117 seconds which in it self is nothing but in my case it is closer to two hours.

I understand that I can plan around it based on past times, but there is variations in activity times.

Is there a way to make ForEach work like in the first example? Has ForEach activity worked like this since the beginning?

Thanks!

Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
{count} votes

1 answer

Sort by: Most helpful
  1. Andris Veidemanis 55 Reputation points
    2023-07-13T08:59:34.5266667+00:00

    Work around:

    1. Split For-Each inner activity to a separate pipeline
    2. Set concurrency limit for that pipeline to 3
    3. Set For-Each to run in batch mode with batchCount larger than total number of items to iterate over

    JSON for workaround:

    {
        "name": "Wait",
        "properties": {
            "activities": [
                {
                    "name": "Wait",
                    "type": "Wait",
                    "dependsOn": [],
                    "userProperties": [],
                    "typeProperties": {
                        "waitTimeInSeconds": {
                            "value": "@pipeline().parameters.p_WaitTime",
                            "type": "Expression"
                        }
                    }
                }
            ],
            "concurrency": 3,
            "parameters": {
                "p_WaitTime": {
                    "type": "int"
                }
            },
            "annotations": []
        }
    }
    
    {
        "name": "ForEach",
        "properties": {
            "activities": [
                {
                    "name": "ForEach",
                    "type": "ForEach",
                    "dependsOn": [],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@variables('v_foreach')",
                            "type": "Expression"
                        },
                        "isSequential": false,
                        "batchCount": 10,
                        "activities": [
                            {
                                "name": "Wait",
                                "type": "ExecutePipeline",
                                "dependsOn": [],
                                "userProperties": [],
                                "typeProperties": {
                                    "pipeline": {
                                        "referenceName": "Wait",
                                        "type": "PipelineReference"
                                    },
                                    "waitOnCompletion": true,
                                    "parameters": {
                                        "p_WaitTime": {
                                            "value": "@item().WaitTime",
                                            "type": "Expression"
                                        }
                                    }
                                }
                            }
                        ]
                    }
                }
            ],
            "variables": {
                "v_foreach": {
                    "type": "Array",
                    "defaultValue": [
                        {
                            "No": 1,
                            "WaitTime": 60
                        },
                        {
                            "No": 2,
                            "WaitTime": 1
                        },
                        {
                            "No": 3,
                            "WaitTime": 1
                        },
                        {
                            "No": 4,
                            "WaitTime": 60
                        },
                        {
                            "No": 5,
                            "WaitTime": 1
                        },
                        {
                            "No": 6,
                            "WaitTime": 1
                        },
                        {
                            "No": 7,
                            "WaitTime": 60
                        }
                    ]
                }
            },
            "annotations": []
        }
    }
    

Your answer

Answers can be marked as 'Accepted' by the question author and 'Recommended' by moderators, which helps users know the answer solved the author's problem.