Hello @Neal Lockhart, Can you try the below ADF dataflow for incremental load
To support incremental data loading, we create a table to store the watermark values for each source table. This watermark table includes the following columns:
-
WatermarkTableName
: The name of the source table. -
WatermarkColumn
: The name of the column used as the watermark (e.g., a timestamp or incremental ID). -
WatermarkValue
: The last processed value of the watermark column.
Watermark Table
`CREATE TABLE [dbo].[WatermarkTable] (`
NULL,
NULL,
NULL
) ON [PRIMARY];
GO
Insert Initial Watermark Record
Next, insert the initial watermark record into the WatermarkTable
. This entry should include the name of the source table, the column used for tracking changes (e.g., a timestamp or ID), and the last loaded value. This insert is a one-time setup—moving forward, the WatermarkValue
will be updated after each incremental load.
In this case, the table will contain the following record:
In this Data Flow, we aim to include only the records from the source table or query where the value is greater than the previously stored watermark value. This ensures that only new or updated data since the last load is processed.
Source 1
In the source and click on "Source Options". Select "Query" and write the query. Click on "Import Schema" and at last we can preview data.
Source 2: watermark table
This source uses a simple query to retrieve data from the WatermarkTable. The configuration is similar to Source 1, but with a different query. Later, we can refine this by ensuring that only the relevant watermark value for the specific table is selected—using a join to match the correct table in the WatermarkTable.
SELECT
[WatermarkTableName]
, [WatermarkColumn]
, [WatermarkValue]
FROM [dbo].[WatermarkTable]
Use Derived Column:
Watermark values can have different data types, which is why we store them as nvarchar
in the WatermarkTable. In this case, the watermark is of type datetime
, so we need to convert it appropriately to ensure a successful join with the watermark table. This conversion is handled using the expression language within Mapping Data Flow.
Use Join Transformation:
To combine data from different sources, we use a join operation. Options include 'Full Outer', 'Inner', 'Left Outer', 'Right Outer', or 'Cross' join. In this scenario, we want to ensure the correct watermark value is applied for the incremental load of a specific table. We're using a Left Outer Join, but an Inner Join would also work since all records refer to the same table. However, joining on the table name from the watermark table is a more future-proof approach—relying on a join based solely on the watermark column could lead to issues when multiple tables share the same watermark column name.
Use Filter Transformation:
Since the "Join" transformation in Data Flow only allows joining on columns with equal values, we use a "Filter" transformation afterward to include only the records where the value from the source table or query is greater than the latest watermark value.
Use Select Transformation:
Here we have to use the "Select" component to select the relevant columns.
Use Derived Column 2 Transformation:
Before configuring the destination, we add a new "Derived Column" transformation to convert the LastEditedWhen
column back to a date. This step is necessary to ensure proper mapping with the Orders_Incremental
table, as it follows the original schema of the Orders
table, where the LastEditedWhen
column is defined with a date/time
data type.
Use Sink
In Azure Data Factory (ADF), the destination is referred to as a "Sink". In this step, we select our target table named Orders_Incremental
, which shares the same schema as the original Orders
table. After creating the Sink dataset, the columns will be mapped automatically. If needed, you can disable Auto Mapping to manually configure the column mappings.
Update Watermark
Finally, we need to update the watermark value to reflect the most recent value—in this case, the latest LastEditedWhen
date. To accomplish this, we’ll use a simple stored procedure that performs the update.
sql
CREATE PROCEDURE [dbo].[usp_UpdateWatermark]
@tableName nvarchar(255)
AS
BEGIN
DECLARE
@watermarkValue nvarchar(255)
SELECT
@watermarkValue = MAX([LastEditedWhen])
FROM [Sales].[Orders_Incremental] AS T
UPDATE [dbo].[WatermarkTable]
SET [WatermarkValue] = @watermarkValue
WHERE [WatermarkTableName] = @tableName
END
GO