Incremental delta load for COSMOS DB to ADLS Gen2

Praveenraj R K 61 Reputation points Microsoft Employee
2023-06-16T18:10:52.9666667+00:00

Hi team,

I have a scenario that I need to incremental load Cosmos DB for No Sql data which has timestamp column into ADLS gen2 account using ADF with user assigned managed identity authentication. Could you please suggest a approach ?

Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
{count} votes

1 answer

Sort by: Most helpful
  1. QuantumCache 20,681 Reputation points Moderator
    2023-06-19T15:41:43.29+00:00

    Hello @Praveenraj Ragupathi,

    Could you please add more info on the Source data format with any sample ? just to understand the data format how does it look on Source side?

    Did you try to use the @pipeline().TriggerTime to get the start time of the pipeline run and use it in the query to retrieve only the new or updated records since the last run?

    Here's an approach you can use to perform incremental load from Cosmos DB to ADLS Gen2 using Azure Data Factory (ADF) with user assigned managed identity authentication:

    Create a Cosmos DB linked service: In ADF, create a linked service for Cosmos DB. Use the user assigned managed identity authentication method to authenticate to Cosmos DB.

    Create an ADLS Gen2 linked service: In ADF, create a linked service for ADLS Gen2. Use the user assigned managed identity authentication method to authenticate to ADLS Gen2.

    1. Create a pipeline: In ADF, create a pipeline that performs the incremental load from Cosmos DB to ADLS Gen2. The pipeline should have the following activities:

    a. Lookup activity: Use a lookup activity to get the maximum timestamp value from the previous load. You can store this value in a parameter or variable.

    b. Cosmos DB source activity: Use a Cosmos DB source activity to read the data from Cosmos DB. Use a query to filter the data based on the timestamp column. The query should retrieve only the data that has a timestamp value greater than the maximum timestamp value from the previous load.

    c. Data flow activity: Use a data flow activity to transform the data as needed. You can use the derived column transformation to add a column for the current timestamp value.

    d. ADLS Gen2 sink activity: Use an ADLS Gen2 sink activity to write the data to ADLS Gen2. Use the current timestamp value as the file name or folder name.

    e. Set variable activity: Use a set variable activity to update the maximum timestamp value for the next load.

    Schedule the pipeline: Schedule the pipeline to run on a regular basis, such as daily or hourly.

    0 comments No comments

Your answer

Answers can be marked as 'Accepted' by the question author and 'Recommended' by moderators, which helps users know the answer solved the author's problem.