Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Copy multiple tables incrementally with a
When you need to copy data from many source tables into Unity Catalog tables on a schedule, copying all rows on every run is slow and expensive. Use a watermark to track the last-processed row for each table and copy only new rows on each run.
This tutorial shows you how to build a metadata-driven job that:
- Stores the list of source tables and their watermark state in a Delta control table
- Uses a
For eachtask to process each table in parallel - Copies only rows added since the last successful run
- Updates the watermark after each successful copy
How it works
The job uses three task types wired together in sequence:
| Task | Type | What it does |
|---|---|---|
read_watermarks |
SQL | Reads the watermark control table and returns one row per source table |
copy_tables |
For each | Iterates over {{tasks.read_watermarks.output.rows}}, running the nested task once per source table |
copy_incremental (nested) |
Notebook | Reads rows added since the last watermark, writes them to the target table, and advances the watermark |
The SQL task output—a JSON array of row objects—flows into the For each task's Inputs field using {{tasks.read_watermarks.output.rows}}. The nested notebook receives source_table, target_table, watermark_column, and last_watermark for each iteration.
Prerequisites
- A Azure Databricks workspace with permission to create jobs and notebooks
- Permission to create tables in Unity Catalog
- A Unity Catalog schema where you can create the control table and target tables (for example,
config) - A SQL warehouse to run SQL tasks
- Source tables that contain a monotonically increasing column such as a timestamp or integer sequence
Step 1: Create the watermark control table
The watermark control table is the source of truth for which tables to process and how far each table has been copied. Each row represents one source table.
Run the following SQL to create the control table and register two source tables:
CREATE OR REPLACE TABLE config.watermarks (
source_table STRING NOT NULL,
target_table STRING NOT NULL,
watermark_column STRING NOT NULL,
last_watermark TIMESTAMP NOT NULL
);
INSERT INTO config.watermarks VALUES
('sales.raw_orders', 'sales.orders', 'updated_at', '1970-01-01'),
('sales.raw_customers', 'sales.customers', 'updated_at', '1970-01-01');
Setting last_watermark to 1970-01-01 on the first run causes the notebook to copy all existing rows, acting as an initial full load. Subsequent runs copy only rows added or updated after the previous run.
Step 2: Write the copy notebook
The notebook runs once per table iteration. It reads the watermark, filters the source, writes to the target, and advances the watermark.
Create a notebook at a path such as /Workspace/Users/<username>/copy_incremental and add the following code:
# Set defaults for running the notebook outside a job
dbutils.widgets.text("source_table", "sales.raw_orders", "Source table")
dbutils.widgets.text("target_table", "sales.orders", "Target table")
dbutils.widgets.text("watermark_column", "updated_at", "Watermark column")
dbutils.widgets.text("last_watermark", "1970-01-01", "Last watermark")
source_table = dbutils.widgets.get("source_table")
target_table = dbutils.widgets.get("target_table")
watermark_column = dbutils.widgets.get("watermark_column")
last_watermark = dbutils.widgets.get("last_watermark")
# Read only new rows from the source table
new_rows = spark.table(source_table).filter(
f"{watermark_column} > '{last_watermark}'"
)
row_count = new_rows.count()
print(f"Copying {row_count} new rows from {source_table}")
if row_count > 0:
# Append new rows to the target table, creating it if it does not exist
new_rows.write.format("delta").mode("append").saveAsTable(target_table)
# Compute the new high-water mark from the rows just written
from pyspark.sql.functions import max as spark_max
new_watermark = new_rows.agg(spark_max(watermark_column)).collect()[0][0]
# Advance the watermark so the next run starts from here
spark.sql(f"""
UPDATE config.watermarks
SET last_watermark = CAST('{new_watermark}' AS TIMESTAMP)
WHERE source_table = '{source_table}'
""")
print(f"Watermark for {source_table} advanced to {new_watermark}")
else:
print(f"No new rows for {source_table}, watermark unchanged")
The dbutils.widgets.text() defaults let you run and test the notebook directly. When the notebook runs inside the For each task, the job overrides these defaults with the actual values for each iteration.
Note
This notebook uses append mode, which is suitable when the source contains only inserts. If your source contains updates, use a MERGE statement instead of write.mode("append") to upsert rows into the target table. See Upsert into a Delta Lake table using merge for merge syntax.
Step 3: Create the job
In your Azure Databricks workspace, click Workflows in the sidebar, then click Create job. Give the job a name such as Incremental table copy.
Step 4: Configure the watermark lookup task
The SQL task reads the control table and makes the result available to the For each task.
Click Add task.
Set Task name to
read_watermarks.Set Type to SQL.
In the SQL field, enter:
SELECT source_table, target_table, watermark_column, last_watermark FROM config.watermarksSet SQL warehouse to a warehouse in your workspace.
Click Create task.
When this task runs, Azure Databricks captures the result as a JSON array in tasks.read_watermarks.output.rows:
[
{
"source_table": "sales.raw_orders",
"target_table": "sales.orders",
"watermark_column": "updated_at",
"last_watermark": "2024-06-01T12:00:00.000Z"
},
{
"source_table": "sales.raw_customers",
"target_table": "sales.customers",
"watermark_column": "updated_at",
"last_watermark": "2024-06-01T12:00:00.000Z"
}
]
Step 5: Configure the For each task
The For each task reads the SQL output and launches one nested task run per source table.
Click Add task and set Depends on to
read_watermarks.Set Task name to
copy_tables.Set Type to For each.
In the Inputs field, enter:
{{tasks.read_watermarks.output.rows}}Set Concurrency to
2to copy two tables at a time. Increase this value if your warehouse can support higher parallelism.Click Add a task to loop over to configure the nested task.
Set Task name to
copy_incremental.Set Type to Notebook.
Set Path to the path of the notebook you created in Step 2.
Click Parameters, then click Add to add each of the following parameters:
Key Value source_table{{input.source_table}}target_table{{input.target_table}}watermark_column{{input.watermark_column}}last_watermark{{input.last_watermark}}Each
{{input.<key>}}reference resolves to the corresponding field from the current iteration's row.Click Create task.
Step 6: Run the job and verify
- Click Run now to trigger the job.
- On the job run page, click the
copy_tablesnode to expand theFor eachtask. - The run page shows a table of iterations—one row per source table—each displaying its status, start time, and duration.
- Click any iteration to view the notebook output and confirm the row count and watermark update.
To confirm the watermark advanced, run the following query after the job completes:
SELECT source_table, last_watermark FROM config.watermarks;
Each last_watermark value should now reflect the timestamp of the most recently copied row. If a value is still 1970-01-01, the source table contained no rows matching the filter, or the copy task encountered an error — check the task run output for details.
Extend the pattern
Add a new source table: Insert a row into the control table. The next job run picks it up automatically, starting with a full load from 1970-01-01:
INSERT INTO config.watermarks VALUES
('sales.raw_products', 'sales.products', 'updated_at', '1970-01-01');
Pause a table: Add an active column and filter in the SQL task:
ALTER TABLE config.watermarks ADD COLUMN active BOOLEAN DEFAULT TRUE;
-- In the SQL task:
SELECT source_table, target_table, watermark_column, last_watermark
FROM config.watermarks
WHERE active = TRUE
Backfill a table: Reset its watermark to re-copy from a specific point:
UPDATE config.watermarks
SET last_watermark = '2024-01-01'
WHERE source_table = 'sales.raw_orders';
Additional resources
- Use a
For eachtask to run another task in a loop — Full reference for configuringFor eachtasks, including concurrency options - Use a control table to drive a
For eachjob — Drive aFor eachjob from a live config table - Upsert into a Delta Lake table using merge — Use
MERGEto upsert rows when source data includes updates - The AUTO CDC APIs: Simplify change data capture with pipelines — Use change data capture for sources that track inserts, updates, and deletes