How do I dynamically read multiple file names?

King Java 790 Reputation points
2024-11-19T21:48:45.4566667+00:00

I am trying to read name of about 20+ csv files, and ingest data dynamically.

Below is diagram of how I ingest data from each csv file individually.

User's image

So, I have at least 20+ pipelines for each csv files.

Below is how each csv files' name look like:

User's image

If I have a small character for variation, I think using Parameters like below screenshot would work, but I need to insert long characters for each file.

Example (of short variation that I use for other case):User's image

What is ideal way to ingest about 20+ csv files that have similar pattern of file names, except part like locations (ex: California, Oregon, Arizona etc.) in the middle of file name?

Should I create a csv file that has a list of locations and somehow read it thru inside Parameter values?

Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
11,624 questions
{count} votes

Accepted answer
  1. AnnuKumari-MSFT 34,556 Reputation points Microsoft Employee Moderator
    2024-11-25T12:35:17.06+00:00

    Hi King Java ,

    Thankyou for using Microsoft Q&A platform and thanks for posting your query here.

    It seems you want to copy multiple files dynamically using ADF pipeline. I would take the latest points from your followup query to address the requirement:

    1. I have a csv file (let's say "location.csv") that list all 20+ locations (example: California, Arizona etc.) to match the csv file names.
    2. I have a pool of csv files (20+) that have payroll data (each with different locations).
    3. Pipeline will go thru that "location.csv", and if name of location matches to the name of payroll files, those matching payroll csv files would be ingested to Azure SQL.

    Since you need to loop through the records in 'location.csv' and loop through each of the payroll file names as well , there is a requirement of nested looping using two level pipelines as we can't use nested looping directly in ADF. So, kindly try the below approach:

    Master_Pipeline:

    1. Use lookup activity to retrieve the data from 'location.csv' file.
    2. Use Foreach activity to loop through the lookup output data by mentioning below expression in foreach items:
         @activity('Lookup1').output.value
      
    3. Inside foreach, use execute pipeline activity to call a child pipeline

    Child pipeline:

    1. Create a pipeline parameter called locations .
    2. Use Getmetadata activity to get all the filenames in your ADLS. Use 'childitems' in the field list.
    3. Use Foreach activity to loop through the getmetadata activity output with this expression:
         @activity('Get Metadata1').output.childItems
      
    4. Inside foreach, use if condition to validate if filename contains the location or not using this expression:
         @contains(string(item().name),pipeline().parameters.locations)
      
    5. Inside true block, use copy activity by parameterizing both source and sink dataset

    In the execute pl activity of master pl, pass the value for parameter as : @item().Location

    Below is the pipeline json:

    {
        "name": "pipeline2",
        "properties": {
            "activities": [
                {
                    "name": "Lookup1",
                    "type": "Lookup",
                    "dependsOn": [],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "source": {
                            "type": "DelimitedTextSource",
                            "storeSettings": {
                                "type": "AzureBlobFSReadSettings",
                                "recursive": true,
                                "enablePartitionDiscovery": false
                            },
                            "formatSettings": {
                                "type": "DelimitedTextReadSettings"
                            }
                        },
                        "dataset": {
                            "referenceName": "DelimitedText1",
                            "type": "DatasetReference"
                        },
                        "firstRowOnly": false
                    }
                },
                {
                    "name": "ForEach1",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "Lookup1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@activity('Lookup1').output.value",
                            "type": "Expression"
                        },
                        "isSequential": false,
                        "activities": [
                            {
                                "name": "Execute Pipeline1",
                                "type": "ExecutePipeline",
                                "dependsOn": [],
                                "policy": {
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "pipeline": {
                                        "referenceName": "pipeline3",
                                        "type": "PipelineReference"
                                    },
                                    "waitOnCompletion": true,
                                    "parameters": {
                                        "locations": {
                                            "value": "@item().Locations",
                                            "type": "Expression"
                                        }
                                    }
                                }
                            }
                        ]
                    }
                }
            ],
            "annotations": []
        }
    }
    
    
    

    User's image

    User's image

    Child pipeline: User's image

    User's image

    Hope it helps. Kindly accept the answer by clicking on Accept answer button. Thankyou

    1 person found this answer helpful.

1 additional answer

Sort by: Most helpful
  1. phemanth 15,755 Reputation points Microsoft External Staff Moderator
    2024-11-19T22:31:04.2+00:00

    @King Java

    Thanks for using Microsoft Q&A forum and posting your query.

    To dynamically read and ingest data from multiple CSV files with similar naming patterns, you can streamline your process significantly. Here’s a structured approach :

    1. Use a List of Locations

    Create a CSV file (or a simple list) that contains all the location names (e.g., California, Oregon, Arizona). This will allow you to easily iterate through the locations when constructing your file names.

    1. Construct File Names Dynamically

    You can use a programming language like Python to dynamically generate the file names based on the locations. Here’s a simple example using Python:

    import pandas as pd
    # List of locations
    locations = ['California', 'Oregon', 'Arizona']  # Add more locations as needed
    # Base file name pattern
    base_file_name = "data_{}_2024.csv"  # Adjust the pattern as per your file naming convention
    # List to hold DataFrames
    dataframes = []
    # Loop through each location to read the corresponding CSV file
    for location in locations:
        file_name = base_file_name.format(location)
        try:
            df = pd.read_csv(file_name)
            dataframes.append(df)
            print(f"Successfully read {file_name}")
        except FileNotFoundError:
            print(f"File {file_name} not found.")
    # Optionally, concatenate all DataFrames into one
    all_data = pd.concat(dataframes, ignore_index=True)
    
    
    1. Parameterize Your Pipeline

    If you are using a data pipeline tool (like Apache Airflow, Azure Data Factory, etc.), you can parameterize the location names. This way, you can pass the list of locations as parameters to your pipeline, which will then construct the file names dynamically.

    1. Error Handling

    Make sure to include error handling (as shown in the example) to manage cases where a file might not exist. This will help you avoid breaking your pipeline if one file is missing.

    1. Considerations for Scalability

    If you anticipate needing to add more locations or file types in the future, consider storing your location list in a database or a configuration file. This will make it easier to manage and update.

    Hope this helps. Do let us know if you any further queries.

    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.