Hello @Praveenraj Ragupathi,
Could you please add more info on the Source data format with any sample ? just to understand the data format how does it look on Source side?
Did you try to use the @pipeline().TriggerTime to get the start time of the pipeline run and use it in the query to retrieve only the new or updated records since the last run?
Here's an approach you can use to perform incremental load from Cosmos DB to ADLS Gen2 using Azure Data Factory (ADF) with user assigned managed identity authentication:
Create a Cosmos DB linked service: In ADF, create a linked service for Cosmos DB. Use the user assigned managed identity authentication method to authenticate to Cosmos DB.
Create an ADLS Gen2 linked service: In ADF, create a linked service for ADLS Gen2. Use the user assigned managed identity authentication method to authenticate to ADLS Gen2.
- Create a pipeline: In ADF, create a pipeline that performs the incremental load from Cosmos DB to ADLS Gen2. The pipeline should have the following activities:
a. Lookup activity: Use a lookup activity to get the maximum timestamp value from the previous load. You can store this value in a parameter or variable.
b. Cosmos DB source activity: Use a Cosmos DB source activity to read the data from Cosmos DB. Use a query to filter the data based on the timestamp column. The query should retrieve only the data that has a timestamp value greater than the maximum timestamp value from the previous load.
c. Data flow activity: Use a data flow activity to transform the data as needed. You can use the derived column transformation to add a column for the current timestamp value.
d. ADLS Gen2 sink activity: Use an ADLS Gen2 sink activity to write the data to ADLS Gen2. Use the current timestamp value as the file name or folder name.
e. Set variable activity: Use a set variable activity to update the maximum timestamp value for the next load.
Schedule the pipeline: Schedule the pipeline to run on a regular basis, such as daily or hourly.