Steps to Implement Metadata-Driven File Processing Pipeline in ADF
Start by creating a SQL metadata table to store configuration details for each source system. This table should include columns like SourceSystem, FolderPath, FilePattern, RawTable, and CleanTable. This allows the pipeline to drive its logic based on metadata instead of hardcoding values.
Next, define a pipeline parameter in Azure Data Factory called SourceSystem. This parameter will be passed by triggers for each source system (e.g., "SAP", "Senior") to control the pipeline’s behavior dynamically.
Use a Lookup activity to query the metadata table using the SourceSystem parameter. This fetches dynamic values such as the folder path in ADLS, the file name pattern, and the target table names.
To detect available files, add a Get Metadata activity to list all files in the source folder (retrieved from metadata). Then, apply a Filter activity to keep only files that match the desired pattern (e.g., SAP_Sales_*.xlsx).
Loop through the filtered file list using ForEach, and within the loop, use another Get Metadata activity to fetch the LastModified timestamp for each file. Store file names and timestamps in an array variable.
After collecting all timestamps, use a Data Flow or expressions in variables to identify the most recently modified file. This ensures the pipeline always processes the latest file, regardless of file name sorting.
Before processing, check if the file has already been handled. Use another Lookup activity to query a log table in SQL that records all previously processed files. If the file exists in the log, skip it.
Use an If Condition activity to control the flow. If the file is new, proceed to process it. If it's already logged, skip processing and record a "Skipped" status in the log table.
For new files, use a Copy Activity with a dynamic dataset to load data into the Raw table. The dataset should accept folderPath and fileName as parameters based on earlier metadata lookup.
After loading into Raw, use a Mapping Data Flow or a Stored Procedure activity to transform and load the data into the Clean unified sales table, as defined in your metadata table.
Finally, log the processing status using a Stored Procedure, recording details like file name, source system, timestamp, and status (Success, Skipped, or Failed) into a centralized audit table.
To automate this, create multiple time-based triggers, each configured for a specific source system and time. Each trigger will pass the appropriate SourceSystem value to the pipeline for scheduled execution.
I hope this helps! Let me know if you have any other questions or need further assistance.