Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Important
REPLACE WHERE flows are in Beta.
This page describes how to use REPLACE WHERE flows in Lakeflow Spark Declarative Pipelines to recompute and overwrite a targeted subset of a table without reprocessing your entire table history. REPLACE WHERE flows handle late-arriving data, upstream reprocessing, schema evolution, and backfills.
With a REPLACE WHERE flow, you define a predicate on the target table. All rows matching the predicate are deleted and replaced by re-evaluating the source query for that same predicate range. Rows that don't match the predicate are left untouched.
Requirements
REPLACE WHERE flows have the following requirements:
- Your pipeline must use the
PREVIEWchannel. - Databricks recommends Unity Catalog and serverless compute. Incremental refresh is only supported on serverless compute.
When to use REPLACE WHERE flows
Use REPLACE WHERE flows for the following scenarios:
- Incremental batch processing without streaming semantics: Process new rows in batches without managing streaming concepts such as watermarks.
- Selective reprocessing: Recompute only rows that match a predicate while leaving all other rows untouched.
- Scenarios beyond standard materialized view capabilities:
- Target tables with longer retention than the source
- Preventing recomputation when a dimension table changes
- Schema evolution without recomputing entire history
Create a REPLACE WHERE flow
Define REPLACE WHERE flows in either SQL or Python.
SQL
Use the FLOW REPLACE WHERE clause inline with CREATE STREAMING TABLE:
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Alternatively, use the long-form CREATE FLOW syntax:
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Python
In Python, the table and the flow are defined in a single statement. The flow inherits the same name as the table:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
The replace_where parameter accepts either a PySpark column expression or a string predicate.
In these examples, all rows from the last 7 days are deleted from orders_enriched and recomputed using the source query. You don't need to add the predicate to the source query. The pipeline engine automatically applies it when reading from the source.
Note
BY NAME is required in SQL. It matches columns by name rather than position.
Backfill historical data
To write historical or corrected rows into the target table outside scheduled refreshes, choose between two mechanisms based on where the historical data lives:
- Predicate overrides: Re-run the flow's source query for a one-time predicate range. Use when the historical data comes from the same source as the incremental data.
- DML statements: Insert into the target table directly, bypassing the flow. Use when the historical data lives in a different source than the incremental data.
Predicate overrides
Override the REPLACE WHERE predicate for a single pipeline update without modifying the pipeline definition. Predicate overrides are one-time, apply only to the current update, and don't affect future runs.
Example: Initial historical load
To perform a one-time backfill of historical data when first setting up a pipeline:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
Example: Correct a column for a specific period
After updating a column definition, backfill the change for a targeted historical range:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
Combine multiple dimensions in a single predicate override:
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
}
]
Helper function: start_update_with_replace_where
Use the pipeline update API from a notebook to submit predicate overrides:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
DML statements
Run DML statements directly on the target table from outside the pipeline to perform initial loads or corrections, such as loading from a legacy table:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Rows inserted through DML are not subject to the REPLACE WHERE predicate and persist across scheduled refreshes unless they fall within the predicate range of a future run.
Full refresh behavior
A full refresh of a REPLACE WHERE flow re-executes the source query using only the current predicate. Rows that were inserted by predicate overrides or DML statements outside the current predicate range are permanently deleted.
Warning
A full refresh clears all existing data and re-executes the flow using only its defined predicate. If a pipeline has been running for a year with a 7-day predicate, a full refresh results in the table containing only the last 7 days of data. All older rows are permanently deleted.
To prevent full refreshes on a table, set the table property pipelines.reset.allowed to false. See Pipeline properties reference.
Incremental refresh
REPLACE WHERE flows use incremental refresh when possible, reprocessing only the source data that has changed since the last refresh rather than recomputing the entire replace window. Incremental refresh requires serverless compute.
When incremental refresh applies
All of the following must be true:
- The pipeline runs on serverless compute.
- The query shape is supported. See Incremental refresh for the supported operator set.
- The predicate references base columns from a source table. Predicates on derived values, such as aggregate or window function outputs, cannot be pushed to a source, which disables incremental refresh.
- No external DML has modified rows in the current replace window. DML that modifies rows outside the current window is not affected.
- The current replace window does not include rows that the previous predicate excluded. If you widen the predicate to cover a range not previously processed, that one refresh falls back to full recomputation. Subsequent refreshes are eligible for incremental refresh again.
- The predicate is deterministic. Predicates using non-deterministic functions such as
rand()disable incremental refresh. Temporal functions such ascurrent_date()are allowed.
The first refresh of any flow is always a full computation. If any condition is not met, that refresh falls back to full recomputation of the current replace window.
Best practices for incremental refresh
Follow these guidelines so REPLACE WHERE flows remain eligible for incremental refresh.
Use a moving lower bound
Predicates with a moving lower bound remain eligible for incremental refresh indefinitely.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
A moving upper bound, such as date BETWEEN date_add(current_date(), -7) AND current_date(), can shift the window to include previously excluded rows, triggering a one-off fallback to full recomputation.
Include the predicate column in GROUP BY
When aggregating, include the predicate column in GROUP BY so the engine can push the predicate below the aggregation.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
If the predicate column is missing from GROUP BY, the predicate cannot be pushed below the aggregation and the source is scanned in full.
Include the predicate column in join keys
Include the predicate column in the join condition so the engine can prune all joined sources.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
If a joined table does not expose the predicate column, that table is scanned in full on each refresh.
Diagnose fallback to full recomputation
When a refresh falls back to full recomputation, the reason is reported in the planning_information event for the flow. See Monitor pipeline event logs. The following table lists the reasons reported in the event:
| Reason | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
An external DML modified rows in the current replace window. |
REPLACE_WHERE_NOT_DETERMINISTIC |
The predicate uses non-deterministic expressions. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
The previous refresh used a non-deterministic predicate. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
The predicate cannot be pushed to any source, the current window includes rows not processed by the previous predicate, or the run uses a predicate override. |
Limitations
REPLACE WHERE flows have the following limitations:
- The target table must be created within the pipeline.
- Only one REPLACE WHERE flow is allowed per target table.
- A table targeted by a REPLACE WHERE flow can't also be targeted by another flow type, such as an AUTO CDC flow or an append flow.
- Expectations are not supported on tables targeted by REPLACE WHERE flows.
- For streaming tables created in Databricks SQL, see REPLACE WHERE flows for standalone streaming tables for syntax and backfill differences.
Examples
The following examples show common REPLACE WHERE flow patterns.
Example 1: Keep historical aggregates from a limited-retention source
This example maintains daily aggregates indefinitely, even after raw data ages out of the source table (3-day retention):
SQL
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
return (
spark.read.table("events_raw")
.groupBy("date", "key")
.agg(F.sum("val").alias("agg"))
)
Example 2: Prevent recomputation when a dimension table changes
This example keeps historical fact rows unchanged when dimension attributes change:
SQL
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
fact_table = spark.read.table("fact_table").alias("f")
dim_users = spark.read.table("dim_users").alias("d")
return (
fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
.select(
col("f.date"),
col("f.user_id"),
col("d.region"),
col("f.revenue"),
)
)
If a user's region changes, only recent rows are recomputed. Historical rows retain the region value at the time they were written. To correct historical rows, run a targeted backfill using predicate overrides.
Example 3: Add a new metric without recomputing full history
This example shows how to evolve a table definition and backfill only a targeted range:
Define the initial table:
SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Python
from pyspark import pipelines as dp from pyspark.sql import functions as F from pyspark.sql.functions import col @dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg(F.count("*").alias("clicks")) )Update the query to add
uniq_users:SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Python
@dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg( F.count("*").alias("clicks"), F.countDistinct("user_id").alias("uniq_users"), ) )Backfill the new metric for the last 30 days:
overrides = [ { "flow_name": "clickstream_daily", "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'", } ] resp = start_update_with_replace_where( pipeline_id="<pipeline-id>", replace_where_overrides=overrides, refresh_selection=["clickstream_daily"], )Rows older than the backfilled range contain
NULLforuniq_users.
Example 4: Iterate on a small window before backfilling full history
This example shows how to validate query logic on a small data window before processing the full historical range.
Start with a short window so each refresh recomputes only the last 7 days while you revise the query:
SQL
CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
return (
spark.read.table("marketing_events")
.groupBy("event_date", "campaign_id")
.agg(F.sum("revenue").alias("total_revenue"))
)
Once the query is finalized, use a predicate override to perform a one-time historical backfill:
overrides = [
{
"flow_name": "revenue_attribution",
"predicate_override": "event_date >= date_add(current_date(), -365)",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["revenue_attribution"],
)