REPLACE WHERE flows for standalone streaming tables

Important

REPLACE WHERE flows for standalone streaming tables are in Beta.

This page describes how to use REPLACE WHERE flows to recompute and overwrite a targeted subset of a standalone streaming table without reprocessing your entire table history. REPLACE WHERE flows handle late-arriving data, upstream reprocessing, schema evolution, and backfills.

With a REPLACE WHERE flow, you define a predicate on the target table. All rows matching the predicate are deleted and replaced by re-evaluating the source query for that same predicate range. Rows that do not match the predicate are left untouched.

Requirements

REPLACE WHERE flows have the following requirements:

  • Your streaming table must use the PREVIEW channel. See channel in Pipeline configurations.
  • Databricks recommends Unity Catalog and serverless compute. Incremental refresh is only supported on serverless compute.

When to use REPLACE WHERE flows

Use REPLACE WHERE flows for the following scenarios:

  • Incremental batch processing without streaming semantics: Process new rows in batches without managing streaming concepts such as watermarks.
  • Selective reprocessing: Recompute only rows that match a predicate while leaving all other rows untouched.
  • Scenarios beyond standard materialized view capabilities:
    • Target tables with longer retention than the source
    • Preventing recomputation when a dimension table changes
    • Schema evolution without recomputing entire history

Create a REPLACE WHERE flow

Use the FLOW REPLACE WHERE clause inline with CREATE OR REFRESH STREAMING TABLE:

CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

During refresh, all rows in the target table that match the predicate are deleted, the source query is recomputed for that same predicate range, and the new results are inserted. In this example, all rows from the last 7 days are deleted from orders_enriched and recomputed using the source query.

You don't need to add the predicate to the source query. The pipeline engine automatically applies it when reading from the source.

Note

BY NAME is required. It ensures columns are matched by name rather than position.

Backfill historical data

To perform backfills, run DML statements directly on the target table:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

Full refresh behavior

A full refresh of a REPLACE WHERE flow re-executes the source query using only the current predicate. Rows that were inserted by DML statements outside the current predicate range are permanently deleted.

Warning

A full refresh clears all existing data and re-executes the flow using only its defined predicate. If a pipeline has been running for a year with a 7-day predicate, a full refresh results in the table containing only the last 7 days of data. All older rows are permanently deleted.

REFRESH STREAMING TABLE orders_enriched FULL;

To prevent full refreshes on a table, set the table property pipelines.reset.allowed to false:

CREATE OR REFRESH STREAMING TABLE orders_enriched
  TBLPROPERTIES (pipelines.reset.allowed = 'false')
  FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
  ...

Incremental refresh

REPLACE WHERE flows use incremental refresh when possible, reprocessing only the source data that has changed since the last refresh rather than recomputing the entire replace window. Incremental refresh requires serverless compute.

When incremental refresh applies

All of the following must be true:

  • The pipeline runs on serverless compute.
  • The query shape is supported. See Incremental refresh for the supported operator set.
  • The predicate references base columns from a source table. Predicates on derived values, such as aggregate or window function outputs, cannot be pushed to a source, which disables incremental refresh.
  • No external DML has modified rows in the current replace window. DML that modifies rows outside the current window is not affected.
  • The current replace window does not include rows that the previous predicate excluded. If you widen the predicate to cover a range not previously processed, that one refresh falls back to full recomputation. Subsequent refreshes are eligible for incremental refresh again.
  • The predicate is deterministic. Predicates using non-deterministic functions such as rand() disable incremental refresh. Temporal functions such as current_date() are allowed.

The first refresh of any flow is always a full computation. If any condition is not met, that refresh falls back to full recomputation of the current replace window.

Best practices for incremental refresh

Follow these guidelines so REPLACE WHERE flows remain eligible for incremental refresh.

Use a moving lower bound

Predicates with a moving lower bound remain eligible for incremental refresh indefinitely.

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

A moving upper bound, such as date BETWEEN date_add(current_date(), -7) AND current_date(), can shift the window to include previously excluded rows, triggering a one-off fallback to full recomputation.

Include the predicate column in GROUP BY

When aggregating, include the predicate column in GROUP BY so the engine can push the predicate below the aggregation.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

If the predicate column is missing from GROUP BY, the predicate cannot be pushed below the aggregation and the source is scanned in full.

Include the predicate column in join keys

Include the predicate column in the join condition so the engine can prune all joined sources.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

If a joined table does not expose the predicate column, that table is scanned in full on each refresh.

Diagnose fallback to full recomputation

When a refresh falls back to full recomputation, the reason is reported in the planning_information event for the flow. See Monitor pipeline event logs. The following table lists the reasons reported in the event:

Reason Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW An external DML modified rows in the current replace window.
REPLACE_WHERE_NOT_DETERMINISTIC The predicate uses non-deterministic expressions.
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC The previous refresh used a non-deterministic predicate.
UNSUPPORTED_REPLACE_WHERE_PREDICATE The predicate cannot be pushed to any source, the current window includes rows not processed by the previous predicate, or the run uses a predicate override.

Examples

The following examples show common REPLACE WHERE flow patterns.

Example 1: Keep historical aggregates from a limited-retention source

This example maintains daily aggregates indefinitely, even after raw data ages out of the source table (3-day retention):

CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Example 2: Prevent recomputation when a dimension table changes

This example keeps historical fact rows unchanged when dimension attributes change:

CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

If a user's region changes, only recent rows are recomputed. Historical rows retain the region value at the time they were written.

Example 3: Add a new metric without recomputing full history

This example shows how to evolve a table definition and backfill only a targeted range:

  1. Define the initial table:

    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    
  2. Update the query to add uniq_users:

    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    Rows older than the 7-day window contain NULL for uniq_users.

Example 4: Iterate on a small window before backfilling full history

This example shows how to validate query logic on a small data window before processing the full historical range.

Start with a short window to validate metrics and iterate on business logic with lower compute costs:

CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

A short window recomputes only the last 7 days on each refresh, so revise the query as many times as needed before committing to a full historical run.

Once the query is finalized, use DML to backfill the full historical range:

INSERT INTO revenue_attribution
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;