Incremental load in ADF using rowversion column for multiple tables
Hello Team,
I am trying to perform an incremental load in ADF for multiple tables in single pipeline , but the tables which I have , contain row version column in them, There is no date-time or identity column which I can use for incremental load, I believe row version can also be used to load data incrementally but how can I implement it in ADF, I was not able to get any reference articles on this one.
I would like if anyone could guide me on this or direct me towards some interesting links for reference
Help and guidance is much appreciated.
Thank You ,
Gp
Azure Data Factory
-
phemanth 10,555 Reputation points • Microsoft Vendor
2023-09-28T08:57:13.47+00:00 thanks for Approaching Microsoft Q&A
To implement incremental load in ADF using a row version column, you can follow these steps:
Create a source dataset that includes the row version column in the query. You can use the following query to retrieve the row version column:
SELECT [rowversion_column], [other_columns] FROM [table_name]
Create a sink dataset that includes the same row version column and other columns as the source dataset.
Create a pipeline that uses the source and sink datasets and a copy activity.
In the copy activity, set the source query to retrieve only the rows that have a row version greater than the last row version that was processed. You can use the following query to achieve this:
SELECT [rowversion_column], [other_columns] FROM [table_name] WHERE [rowversion_column] > @last_rowversion
Here,
@last_rowversion
is a parameter that stores the value of the last row version that was processed. You can use a lookup activity to retrieve this value from a control table or a variable activity that stores the value in a variable.Set the sink write behavior to "Insert" to insert the new rows into the destination table.
For more information on implementing incremental load in ADF, you can refer to the following documentation links:
- Incremental copy in Azure Data Factory
- Copy and transform data in Azure Data Factory using Mapping Data Flows
if you have further queries, please do let us know.
-
phemanth 10,555 Reputation points • Microsoft Vendor
2023-09-29T09:32:43.0333333+00:00 We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet. In case if you have any resolution, please do share that same with the community as it can be helpful to others. Otherwise, will respond with more details and we will try to help.
-
Gauri Pawse 0 Reputation points
2023-09-29T10:05:20.45+00:00 Thank You for your response. Yes I have got my data pipeline working, I have designed almost in similar way like the one you have mentioned. But I am struggling to update my watermark table with the max row version . I am using a script activity after copy data to update the watermark table value by using a SQL script :
update Watermark
set [row version] = (select max(ColRversion) from fact.'@{item().TABLE_NAME}'
where TableName = @{activity('tablelist').output.firstRow.TableName}
Here tablelist activity is the one where I am passing my table names
-
phemanth 10,555 Reputation points • Microsoft Vendor
2023-09-30T09:40:07.1466667+00:00 One possible solution is to use the Lookup activity in Azure Data Factory to get the maximum row version from the fact table and store it in a variable. Then, use the Set Variable activity to set the value of the watermark table to the value of the variable. Here are the steps:
- Add a Lookup activity to the pipeline after the Copy Data activity that copies data from the fact table. In the Lookup activity, set the source dataset to the fact table and use a query to get the maximum row version. For example:
SELECT MAX(ColRversion) AS MaxRowVersion FROM fact.'@{item().TABLE_NAME}'
- Add a Set Variable activity to the pipeline after the Lookup activity. In the Set Variable activity, set the variable to the value of the maximum row version returned by the Lookup activity. For example:
{ "name": "Set Watermark", "type": "SetVariable", "dependsOn": [ { "activity": "Lookup Max Row Version", "dependencyConditions": [ "Succeeded" ] } ], "userProperties": [], "typeProperties": { "variableName": "WatermarkValue", "value": "@activity('Lookup Max Row Version').output.firstRow.MaxRowVersion" } }
- Modify the SQL script activity to use the variable to update the watermark table. For example:
UPDATE WatermarkSET [row version] = @variables('WatermarkValue')WHERE TableName = '@{activity('tablelist').output.firstRow.TableName}'
-
phemanth 10,555 Reputation points • Microsoft Vendor
2023-10-03T09:14:20.5466667+00:00 just checking back to see if you have a resolution yet. In case if you have any resolution please do share that same with the community as it can be helpful to others. Otherwise, will respond with more details and we will try to help.
-
Gauri Pawse 0 Reputation points
2023-10-03T11:24:21.84+00:00 @phemanth I just realised that my incremental load is not working with rowversion. How can I implement insert update and delete behaviour with row verison
-
phemanth 10,555 Reputation points • Microsoft Vendor
2023-10-04T09:23:58.0833333+00:00 To implement insert, update, and delete behavior with rowversion, you can use the Merge statement in SQL Server Integration Services (SSIS) The Merge statement can be used to compare the source and destination tables and perform insert, update, and delete operations based on the rowversion column. Here are the general steps to implement this approach:
- Create a staging table to hold the data from the source table.
- Use the Merge statement to compare the staging table with the destination table based on the rowversion column.
- Insert new rows from the staging table to the destination table.
- Update existing rows in the destination table with the corresponding rows from the staging table.
- Delete rows from the destination table that do not exist in the staging table.
for further queries please do let us know.
-
Gauri Pawse 0 Reputation points
2023-10-04T12:05:22.5266667+00:00 @phemanth I understand the Logic for Insert and Update but for Delete I have a slight doubt. As I will be doing truncate and load for my staging part so obviously all the rows which are in target will not be present in staging as it will only hold new rows and truncate the rest every time the ADF pipeline will run, so as per delete logic it will delete almost everything at Target side , please correct me if I am wrong
-
phemanth 10,555 Reputation points • Microsoft Vendor
2023-10-05T08:38:29.79+00:00 There are other ways to delete data, such as using a stored procedure activity to delete records from the target table that are already deleted from the source tables after the last data load.
Additionally, it is possible to use the "Delete" option in the sink tab of the copy activity to delete rows from the target table based on a condition.
for any other queries please do let us know.
-
phemanth 10,555 Reputation points • Microsoft Vendor
2023-10-06T12:00:01.32+00:00 We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet. In case if you have any resolution please do share that same with the community as it can be helpful to others. Otherwise, will respond with more details and we will try to help.
Sign in to comment