Hi King Java ,
Thankyou for using Microsoft Q&A platform and thanks for posting your query here.
It seems you want to copy multiple files dynamically using ADF pipeline. I would take the latest points from your followup query to address the requirement:
- I have a csv file (let's say "location.csv") that list all 20+ locations (example: California, Arizona etc.) to match the csv file names.
- I have a pool of csv files (20+) that have payroll data (each with different locations).
- Pipeline will go thru that "location.csv", and if name of location matches to the name of payroll files, those matching payroll csv files would be ingested to Azure SQL.
Since you need to loop through the records in 'location.csv' and loop through each of the payroll file names as well , there is a requirement of nested looping using two level pipelines as we can't use nested looping directly in ADF. So, kindly try the below approach:
Master_Pipeline:
- Use lookup activity to retrieve the data from 'location.csv' file.
- Use Foreach activity to loop through the lookup output data by mentioning below expression in foreach items:
@activity('Lookup1').output.value
- Inside foreach, use execute pipeline activity to call a child pipeline
Child pipeline:
- Create a pipeline parameter called locations .
- Use Getmetadata activity to get all the filenames in your ADLS. Use 'childitems' in the field list.
- Use Foreach activity to loop through the getmetadata activity output with this expression:
@activity('Get Metadata1').output.childItems
- Inside foreach, use if condition to validate if filename contains the location or not using this expression:
@contains(string(item().name),pipeline().parameters.locations)
- Inside true block, use copy activity by parameterizing both source and sink dataset
In the execute pl activity of master pl, pass the value for parameter as : @item().Location
Below is the pipeline json:
{
"name": "pipeline2",
"properties": {
"activities": [
{
"name": "Lookup1",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
},
"dataset": {
"referenceName": "DelimitedText1",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Lookup1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Lookup1').output.value",
"type": "Expression"
},
"isSequential": false,
"activities": [
{
"name": "Execute Pipeline1",
"type": "ExecutePipeline",
"dependsOn": [],
"policy": {
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"pipeline": {
"referenceName": "pipeline3",
"type": "PipelineReference"
},
"waitOnCompletion": true,
"parameters": {
"locations": {
"value": "@item().Locations",
"type": "Expression"
}
}
}
}
]
}
}
],
"annotations": []
}
}
Child pipeline:
Hope it helps. Kindly accept the answer by clicking on Accept answer
button. Thankyou