Batch processing with REPLACE WHERE flows

Important

REPLACE WHERE flows are in Beta.

This page describes how to use REPLACE WHERE flows in Lakeflow Spark Declarative Pipelines to recompute and overwrite a targeted subset of a table without reprocessing your entire table history. REPLACE WHERE flows handle late-arriving data, upstream reprocessing, schema evolution, and backfills.

With a REPLACE WHERE flow, you define a predicate on the target table. All rows matching the predicate are deleted and replaced by re-evaluating the source query for that same predicate range. Rows that don't match the predicate are left untouched.

Requirements

REPLACE WHERE flows have the following requirements:

  • Your pipeline must use the PREVIEW channel.
  • Databricks recommends Unity Catalog and serverless compute. Incremental refresh is only supported on serverless compute.

When to use REPLACE WHERE flows

Use REPLACE WHERE flows for the following scenarios:

  • Incremental batch processing without streaming semantics: Process new rows in batches without managing streaming concepts such as watermarks.
  • Selective reprocessing: Recompute only rows that match a predicate while leaving all other rows untouched.
  • Scenarios beyond standard materialized view capabilities:
    • Target tables with longer retention than the source
    • Preventing recomputation when a dimension table changes
    • Schema evolution without recomputing entire history

Create a REPLACE WHERE flow

Define REPLACE WHERE flows in either SQL or Python.

SQL

Use the FLOW REPLACE WHERE clause inline with CREATE STREAMING TABLE:

CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Alternatively, use the long-form CREATE FLOW syntax:

CREATE STREAMING TABLE orders_enriched;

CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Python

In Python, the table and the flow are defined in a single statement. The flow inherits the same name as the table:

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
  orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
  product_dim = spark.read.table("product_dim")
  return orders_fct.join(product_dim, "product_id")

The replace_where parameter accepts either a PySpark column expression or a string predicate.

In these examples, all rows from the last 7 days are deleted from orders_enriched and recomputed using the source query. You don't need to add the predicate to the source query. The pipeline engine automatically applies it when reading from the source.

Note

BY NAME is required in SQL. It matches columns by name rather than position.

Backfill historical data

To write historical or corrected rows into the target table outside scheduled refreshes, choose between two mechanisms based on where the historical data lives:

  • Predicate overrides: Re-run the flow's source query for a one-time predicate range. Use when the historical data comes from the same source as the incremental data.
  • DML statements: Insert into the target table directly, bypassing the flow. Use when the historical data lives in a different source than the incremental data.

Predicate overrides

Override the REPLACE WHERE predicate for a single pipeline update without modifying the pipeline definition. Predicate overrides are one-time, apply only to the current update, and don't affect future runs.

Example: Initial historical load

To perform a one-time backfill of historical data when first setting up a pipeline:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
)
print(resp)

Example: Correct a column for a specific period

After updating a column definition, backfill the change for a targeted historical range:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
  refresh_selection=["orders_enriched"],
)
print(resp)

Combine multiple dimensions in a single predicate override:

overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
  }
]
Helper function: start_update_with_replace_where

Use the pipeline update API from a notebook to submit predicate overrides:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse


def start_update_with_replace_where(
  pipeline_id: str,
  replace_where_overrides: list[dict],
  refresh_selection: list[str] = None,
) -> StartUpdateResponse:
  """Start a pipeline update with REPLACE WHERE predicate overrides."""
  client = WorkspaceClient()

  body = {
    "pipeline_id": pipeline_id,
    "cause": "JOB_TASK",
    "update_cause_details": {
      "job_details": {"performance_target": "PERFORMANCE"}
    },
    "replace_where_overrides": replace_where_overrides,
  }

  if refresh_selection:
    body["refresh_selection"] = refresh_selection

  res = client.api_client.do(
    "POST",
    f"/api/2.0/pipelines/{pipeline_id}/updates",
    body=body,
    headers={"Accept": "application/json", "Content-Type": "application/json"},
  )

  return StartUpdateResponse.from_dict(res)

DML statements

Run DML statements directly on the target table from outside the pipeline to perform initial loads or corrections, such as loading from a legacy table:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

Rows inserted through DML are not subject to the REPLACE WHERE predicate and persist across scheduled refreshes unless they fall within the predicate range of a future run.

Full refresh behavior

A full refresh of a REPLACE WHERE flow re-executes the source query using only the current predicate. Rows that were inserted by predicate overrides or DML statements outside the current predicate range are permanently deleted.

Warning

A full refresh clears all existing data and re-executes the flow using only its defined predicate. If a pipeline has been running for a year with a 7-day predicate, a full refresh results in the table containing only the last 7 days of data. All older rows are permanently deleted.

To prevent full refreshes on a table, set the table property pipelines.reset.allowed to false. See Pipeline properties reference.

Incremental refresh

REPLACE WHERE flows use incremental refresh when possible, reprocessing only the source data that has changed since the last refresh rather than recomputing the entire replace window. Incremental refresh requires serverless compute.

When incremental refresh applies

All of the following must be true:

  • The pipeline runs on serverless compute.
  • The query shape is supported. See Incremental refresh for the supported operator set.
  • The predicate references base columns from a source table. Predicates on derived values, such as aggregate or window function outputs, cannot be pushed to a source, which disables incremental refresh.
  • No external DML has modified rows in the current replace window. DML that modifies rows outside the current window is not affected.
  • The current replace window does not include rows that the previous predicate excluded. If you widen the predicate to cover a range not previously processed, that one refresh falls back to full recomputation. Subsequent refreshes are eligible for incremental refresh again.
  • The predicate is deterministic. Predicates using non-deterministic functions such as rand() disable incremental refresh. Temporal functions such as current_date() are allowed.

The first refresh of any flow is always a full computation. If any condition is not met, that refresh falls back to full recomputation of the current replace window.

Best practices for incremental refresh

Follow these guidelines so REPLACE WHERE flows remain eligible for incremental refresh.

Use a moving lower bound

Predicates with a moving lower bound remain eligible for incremental refresh indefinitely.

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

A moving upper bound, such as date BETWEEN date_add(current_date(), -7) AND current_date(), can shift the window to include previously excluded rows, triggering a one-off fallback to full recomputation.

Include the predicate column in GROUP BY

When aggregating, include the predicate column in GROUP BY so the engine can push the predicate below the aggregation.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

If the predicate column is missing from GROUP BY, the predicate cannot be pushed below the aggregation and the source is scanned in full.

Include the predicate column in join keys

Include the predicate column in the join condition so the engine can prune all joined sources.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

If a joined table does not expose the predicate column, that table is scanned in full on each refresh.

Diagnose fallback to full recomputation

When a refresh falls back to full recomputation, the reason is reported in the planning_information event for the flow. See Monitor pipeline event logs. The following table lists the reasons reported in the event:

Reason Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW An external DML modified rows in the current replace window.
REPLACE_WHERE_NOT_DETERMINISTIC The predicate uses non-deterministic expressions.
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC The previous refresh used a non-deterministic predicate.
UNSUPPORTED_REPLACE_WHERE_PREDICATE The predicate cannot be pushed to any source, the current window includes rows not processed by the previous predicate, or the run uses a predicate override.

Limitations

REPLACE WHERE flows have the following limitations:

  • The target table must be created within the pipeline.
  • Only one REPLACE WHERE flow is allowed per target table.
  • A table targeted by a REPLACE WHERE flow can't also be targeted by another flow type, such as an AUTO CDC flow or an append flow.
  • Expectations are not supported on tables targeted by REPLACE WHERE flows.
  • For streaming tables created in Databricks SQL, see REPLACE WHERE flows for standalone streaming tables for syntax and backfill differences.

Examples

The following examples show common REPLACE WHERE flow patterns.

Example 1: Keep historical aggregates from a limited-retention source

This example maintains daily aggregates indefinitely, even after raw data ages out of the source table (3-day retention):

SQL

CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
  return (
    spark.read.table("events_raw")
      .groupBy("date", "key")
      .agg(F.sum("val").alias("agg"))
  )

Example 2: Prevent recomputation when a dimension table changes

This example keeps historical fact rows unchanged when dimension attributes change:

SQL

CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
  fact_table = spark.read.table("fact_table").alias("f")
  dim_users = spark.read.table("dim_users").alias("d")
  return (
    fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
      .select(
        col("f.date"),
        col("f.user_id"),
        col("d.region"),
        col("f.revenue"),
      )
  )

If a user's region changes, only recent rows are recomputed. Historical rows retain the region value at the time they were written. To correct historical rows, run a targeted backfill using predicate overrides.

Example 3: Add a new metric without recomputing full history

This example shows how to evolve a table definition and backfill only a targeted range:

  1. Define the initial table:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    from pyspark.sql.functions import col
    
    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(F.count("*").alias("clicks"))
      )
    
  2. Update the query to add uniq_users:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(
            F.count("*").alias("clicks"),
            F.countDistinct("user_id").alias("uniq_users"),
          )
      )
    
  3. Backfill the new metric for the last 30 days:

    overrides = [
      {
        "flow_name": "clickstream_daily",
        "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
      }
    ]
    
    resp = start_update_with_replace_where(
      pipeline_id="<pipeline-id>",
      replace_where_overrides=overrides,
      refresh_selection=["clickstream_daily"],
    )
    

    Rows older than the backfilled range contain NULL for uniq_users.

Example 4: Iterate on a small window before backfilling full history

This example shows how to validate query logic on a small data window before processing the full historical range.

Start with a short window so each refresh recomputes only the last 7 days while you revise the query:

SQL

CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
  return (
    spark.read.table("marketing_events")
      .groupBy("event_date", "campaign_id")
      .agg(F.sum("revenue").alias("total_revenue"))
  )

Once the query is finalized, use a predicate override to perform a one-time historical backfill:

overrides = [
  {
    "flow_name": "revenue_attribution",
    "predicate_override": "event_date >= date_add(current_date(), -365)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id="<pipeline-id>",
  replace_where_overrides=overrides,
  refresh_selection=["revenue_attribution"],
)