Passing multiple files as parameter to databricks notebook.(one file at a time)

Vinay5 46 Reputation points
2021-05-13T16:57:10.893+00:00

Hello,

I have around n number of files in a nested folder structure as below.

data/year/Month/Day/time
File1.xml
File2.xml
... File..n.xml.

I have to pass each file as a parameter to a Databricks notebook,one after the other.
For instance, once the file1 is passed as a parameter to a notebook and when the execution of notebook completes , we need to pass file2 as a parameter to the same notebook from adf dynamically.

Example:
For a given day lets say 2021-05-13, the folder structure will be like Sourcename/Year(2021)/month(05)/day(13)/timefolder(1).
So, in the time folder(1) there will be 7 files, I will have to pass each file one after the other as a parameter to databricks notebook.
Once the timefolder(1) is completed,there will be timefolder(2) and it has somefiles and we have to pass these files as a prameter to databricks notebook. In this way, for the given date and time folders, we will have to pass each file as a parameter to the notebook dynamically. Note: The notebook and the logic in the notebook is constant.

I used getmetadata activiy and was able to acheive a part of this requirement, where there is one timefolder and 1 file in the timefolder, but unable to design a pipeline for n timefolders and n files in it.

Could someone please assist.

Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
11,625 questions
{count} votes

2 answers

Sort by: Most helpful
  1. MartinJaffer-MSFT 26,236 Reputation points
    2021-05-14T18:52:48.063+00:00

    Okay. Lets do this, @Vinay5 .

    There are several tools we can use in building a solution. The central one is the forEach loop activity. This takes in an iterate-able thing, such as an array or array-type variable, and makes it available to a sub-pipeline like container of activities. These contained activities reference the current element by @item().

    You mentioned you currently have a partial solution. This probably involves concatenating the folder-path and filename and passing it to the Databricks activity.

    Be aware that pipeline variables are scoped to the entire pipeline. Using a set variable activity inside a forEach loop will overwrite any value set outside the loop. It will also overwrite any value set by another iteration of the loop. This means concurrent (non-sequential) loops can cause a problem called a race condition.

    If the file names are predictable, continuous, and sequential like ["time1","time2","time3","time4"] , we can construct them by using the @range(1,4) function in the forEach loop items. @concat(pipeline().parameters.folderPath , 'time', string(item()) )

    If the file names are unpredictable like ["time8:01AM","time12:22PM", "time3:36PM","time9:02PM"] then we will need to get the list of file names via GetMetadata childItems. This list will be passed to the forEach loop items.

    That was the simplest solution.

    If you want to make this more modular, or need to iterate multiple levels of folders, then breaking up into multiple pipelines is a solution. This is in part because you cannot nest one forEach loop inside another forEach loop. One pipeline gets metadata and iterates over that, each time calling a child pipeline using Execute Pipeline activity. The child pipeline can have its own loop and then do the databricks.

    I did come up with a single-pipeline solution for multiple loops, but it is complicated and limited.

    Please let me know which you would like more information on, if any. Thank you for your patience.


  2. MartinJaffer-MSFT 26,236 Reputation points
    2021-05-17T21:50:27.28+00:00

    @Vinay5 below are the JSON for two pipelines meant to be used together. The doFolders is parent pipeline, and calls doFiles, the child pipeline, with a list of files.

    DoFolders takes as input a list of folders. I don't know if you want to do all folders, a few folders, or somethign else, so I left it as a parameter input. It takes this list, iterates over the folders, and gets child Items. These childItems are passed to doFiles.

    DoFiles takes the childItem input, filters out any folders, then iterates over the files. Replace the Set Variable with your databricks activity.

    ----------

    {  
        "name": "doFolder",  
        "properties": {  
            "activities": [  
                {  
                    "name": "ForEach1",  
                    "type": "ForEach",  
                    "dependsOn": [],  
                    "userProperties": [],  
                    "typeProperties": {  
                        "items": {  
                            "value": "@pipeline().parameters.folderlist",  
                            "type": "Expression"  
                        },  
                        "isSequential": false,  
                        "activities": [  
                            {  
                                "name": "Get Metadata1",  
                                "type": "GetMetadata",  
                                "dependsOn": [],  
                                "policy": {  
                                    "timeout": "7.00:00:00",  
                                    "retry": 0,  
                                    "retryIntervalInSeconds": 30,  
                                    "secureOutput": false,  
                                    "secureInput": false  
                                },  
                                "userProperties": [],  
                                "typeProperties": {  
                                    "dataset": {  
                                        "referenceName": "BinaryName",  
                                        "type": "DatasetReference",  
                                        "parameters": {  
                                            "folder": "@item()"  
                                        }  
                                    },  
                                    "fieldList": [  
                                        "childItems"  
                                    ],  
                                    "storeSettings": {  
                                        "type": "AzureBlobFSReadSettings",  
                                        "enablePartitionDiscovery": false  
                                    },  
                                    "formatSettings": {  
                                        "type": "BinaryReadSettings"  
                                    }  
                                }  
                            },  
                            {  
                                "name": "Execute Pipeline1",  
                                "type": "ExecutePipeline",  
                                "dependsOn": [  
                                    {  
                                        "activity": "Get Metadata1",  
                                        "dependencyConditions": [  
                                            "Succeeded"  
                                        ]  
                                    }  
                                ],  
                                "userProperties": [],  
                                "typeProperties": {  
                                    "pipeline": {  
                                        "referenceName": "doFiles",  
                                        "type": "PipelineReference"  
                                    },  
                                    "waitOnCompletion": false,  
                                    "parameters": {  
                                        "filelist": {  
                                            "value": "@activity('Get Metadata1').output.childItems",  
                                            "type": "Expression"  
                                        }  
                                    }  
                                }  
                            }  
                        ]  
                    }  
                }  
            ],  
            "parameters": {  
                "folderlist": {  
                    "type": "array",  
                    "defaultValue": [  
                        "input"  
                    ]  
                }  
            },  
            "variables": {  
                "filelist": {  
                    "type": "Array",  
                    "defaultValue": [  
                        "input"  
                    ]  
                }  
            },  
            "annotations": []  
        }  
    }  
    

    ----------

    {  
        "name": "doFiles",  
        "properties": {  
            "activities": [  
                {  
                    "name": "Filter1",  
                    "type": "Filter",  
                    "dependsOn": [],  
                    "userProperties": [],  
                    "typeProperties": {  
                        "items": {  
                            "value": "@pipeline().parameters.filelist",  
                            "type": "Expression"  
                        },  
                        "condition": {  
                            "value": "@equals(item().type,'File')",  
                            "type": "Expression"  
                        }  
                    }  
                },  
                {  
                    "name": "ForEach1",  
                    "type": "ForEach",  
                    "dependsOn": [  
                        {  
                            "activity": "Filter1",  
                            "dependencyConditions": [  
                                "Succeeded"  
                            ]  
                        }  
                    ],  
                    "userProperties": [],  
                    "typeProperties": {  
                        "items": {  
                            "value": "@activity('Filter1').output.value",  
                            "type": "Expression"  
                        },  
                        "activities": [  
                            {  
                                "name": "Set variable1",  
                                "type": "SetVariable",  
                                "dependsOn": [],  
                                "userProperties": [],  
                                "typeProperties": {  
                                    "variableName": "theFileName",  
                                    "value": {  
                                        "value": "@item().name",  
                                        "type": "Expression"  
                                    }  
                                }  
                            }  
                        ]  
                    }  
                }  
            ],  
            "parameters": {  
                "filelist": {  
                    "type": "array"  
                }  
            },  
            "variables": {  
                "theFileName": {  
                    "type": "String"  
                }  
            },  
            "annotations": []  
        }  
    }  
    

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.