Pemrosesan batch menggunakan alur REPLACE WHERE

Important

Alur REPLACE WHERE masih dalam versi Beta.

Halaman ini menjelaskan cara menggunakan alur REPLACE WHERE di Lakeflow Spark Declarative Pipelines untuk mengolah ulang dan menimpa subset tabel yang ditargetkan tanpa memproses ulang seluruh riwayat tabel Anda. Alur REPLACE WHERE menangani data yang datang terlambat, pemrosesan ulang upstream, evolusi skema, dan pengisian ulang data historis.

Dengan alur REPLACE WHERE , Anda menentukan predikat pada tabel target. Semua baris yang cocok dengan predikat dihapus dan diganti dengan mengevaluasi ulang kueri sumber untuk rentang predikat yang sama. Baris yang tidak cocok dengan predikat dibiarkan tidak tersentuh.

Requirements

Alur REPLACE WHERE memiliki persyaratan berikut:

  • Alur Anda harus menggunakan PREVIEW saluran.
  • Databricks merekomendasikan Unity Catalog dan komputasi tanpa server. Refresh bertahap hanya didukung pada komputasi tanpa server.

Kapan menggunakan alur REPLACE WHERE

Gunakan alur REPLACE WHERE untuk skenario berikut:

  • Pemrosesan batch inkremental tanpa semantik streaming: Proses baris baru secara batch tanpa mengelola konsep streaming seperti watermark.
  • Pemrosesan ulang selektif: Komputasi ulang hanya baris yang cocok dengan predikat sambil membiarkan semua baris lain tidak tersentuh.
  • Skenario di luar kemampuan tampilan materialisasi standar:
    • Tabel target yang memiliki periode retensi lebih lama dibandingkan tabel sumber
    • Mencegah komputasi ulang saat tabel dimensi berubah
    • Evolusi skema tanpa mengolah ulang seluruh riwayat

Membuat alur REPLACE WHERE

Tentukan alur REPLACE WHERE baik di SQL atau Python.

SQL

Gunakan klausa sebaris FLOW REPLACE WHERE dengan CREATE STREAMING TABLE:

CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Atau, gunakan sintaks bentuk CREATE FLOW panjang:

CREATE STREAMING TABLE orders_enriched;

CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Python

Dalam Python, tabel dan alur ditentukan dalam satu pernyataan. Alur mewarisi nama yang sama dengan tabel:

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
  orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
  product_dim = spark.read.table("product_dim")
  return orders_fct.join(product_dim, "product_id")

Parameter replace_where menerima ekspresi kolom PySpark atau predikat string.

Dalam contoh ini, semua baris dari 7 hari terakhir dihapus dari orders_enriched dan dikomputasi ulang menggunakan kueri sumber. Anda tidak perlu menambahkan predikat ke kueri sumber. Mesin alur secara otomatis menerapkannya saat membaca dari sumber.

Note

BY NAME diperlukan dalam SQL. Ini mencocokkan kolom berdasarkan nama, bukan berdasarkan posisi.

Mengisi ulang data historis

Untuk menulis baris data historis atau yang telah dikoreksi ke dalam tabel target di luar penyegaran terjadwal, pilih salah satu dari dua mekanisme berdasarkan lokasi data historis disimpan:

  • Penggantian predikat: Jalankan kembali kueri sumber alur untuk rentang predikat sekali saja. Gunakan saat data historis berasal dari sumber yang sama dengan data inkremental.
  • Pernyataan DML: Sisipkan ke tabel target secara langsung, melewati alur. Gunakan saat data historis berada di sumber yang berbeda dari data inkremental.

Penggantian predikat

Ambil alih predikat REPLACE WHERE untuk pembaruan alur tunggal tanpa memodifikasi definisi alur. Penggantian predikat hanya berlaku satu kali, hanya diterapkan pada pembaruan saat ini, dan tidak memengaruhi eksekusi berikutnya.

Contoh: Beban historis awal

Untuk melakukan pengisian balik data historis satu kali saat pertama kali menyiapkan pipeline:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
)
print(resp)

Contoh: Memperbaiki kolom untuk periode tertentu

Setelah memperbarui definisi kolom, isi ulang perubahan untuk rentang historis yang ditargetkan:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
  refresh_selection=["orders_enriched"],
)
print(resp)

Gabungkan beberapa dimensi dalam satu penggantian predikat:

overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
  }
]
Fungsi pembantu: start_update_with_replace_where

Gunakan API pembaruan pipeline dari notebook untuk mengirimkan override predikat:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse


def start_update_with_replace_where(
  pipeline_id: str,
  replace_where_overrides: list[dict],
  refresh_selection: list[str] = None,
) -> StartUpdateResponse:
  """Start a pipeline update with REPLACE WHERE predicate overrides."""
  client = WorkspaceClient()

  body = {
    "pipeline_id": pipeline_id,
    "cause": "JOB_TASK",
    "update_cause_details": {
      "job_details": {"performance_target": "PERFORMANCE"}
    },
    "replace_where_overrides": replace_where_overrides,
  }

  if refresh_selection:
    body["refresh_selection"] = refresh_selection

  res = client.api_client.do(
    "POST",
    f"/api/2.0/pipelines/{pipeline_id}/updates",
    body=body,
    headers={"Accept": "application/json", "Content-Type": "application/json"},
  )

  return StartUpdateResponse.from_dict(res)

Statemen DML

Jalankan pernyataan DML langsung pada tabel target dari luar pipeline untuk melakukan pemuatan awal atau koreksi, seperti memuat data dari tabel lama:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

Baris yang disisipkan melalui DML tidak dipengaruhi oleh predikat REPLACE WHERE dan tetap ada setelah penyegaran terjadwal, kecuali jika baris tersebut termasuk dalam rentang predikat pada eksekusi mendatang.

Perilaku refresh penuh

Penyegaran penuh pada alur REPLACE WHERE mengeksekusi ulang kueri sumber dengan hanya menggunakan predikat saat ini. Baris yang disisipkan oleh penimpaan predikat atau pernyataan DML di luar rentang predikat saat ini dihapus secara permanen.

Warning

Refresh penuh menghapus semua data yang ada dan menjalankan kembali alur hanya menggunakan predikat yang ditentukan. Jika alur telah berjalan selama setahun dengan predikat 7 hari, refresh penuh menghasilkan tabel yang hanya berisi data 7 hari terakhir. Semua baris lama dihapus secara permanen.

Untuk mencegah refresh penuh pada tabel, atur properti pipelines.reset.allowed tabel ke false. Lihat Referensi properti alur.

Penyegaran bertahap

Alur REPLACE WHERE menggunakan refresh bertambah bertahap jika memungkinkan, hanya memproses ulang data sumber yang telah berubah sejak refresh terakhir daripada mengolah ulang seluruh jendela penggantian. Penyegaran inkremental memerlukan komputasi nirserver.

Saat refresh bertahap berlaku

Semua hal berikut ini harus benar:

  • Alur berjalan pada komputasi tanpa server.
  • Bentuk kueri didukung. Lihat Penyegaran inkremental untuk kumpulan operator yang didukung.
  • Predikat mereferensikan kolom dasar dari tabel sumber. Predikat pada nilai turunan, seperti hasil fungsi agregat atau fungsi jendela, tidak dapat diterapkan ke sumber data, yang menonaktifkan penyegaran bertahap.
  • Tidak ada DML eksternal yang telah memodifikasi baris di jendela ganti saat ini. DML yang memodifikasi baris di luar jendela saat ini tidak terpengaruh.
  • Jendela ganti saat ini tidak menyertakan baris yang dikecualikan oleh predikat sebelumnya. Jika Anda memperlebar predikat untuk mencakup rentang yang sebelumnya tidak diproses, refresh tersebut akan kembali ke komputasi ulang penuh. Refresh berikutnya dapat di-refresh secara inkremental kembali.
  • Predikatnya deterministik. Predikat menggunakan fungsi non-deterministik seperti rand() menonaktifkan refresh bertahap. Fungsi temporal seperti current_date() diizinkan.

Refresh pertama dari alur apa pun selalu merupakan komputasi penuh. Jika ada kondisi yang tidak terpenuhi, refresh tersebut kembali ke komputasi ulang penuh dari jendela ganti saat ini.

Praktik terbaik untuk penyegaran inkremental

Ikuti panduan ini agar alur REPLACE WHERE tetap memenuhi syarat untuk penyegaran inkremental.

Gunakan batas bawah bergerak

Predikat dengan batas bawah bergerak tetap memenuhi syarat untuk refresh bertahap tanpa batas waktu.

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

Batas atas dinamis, seperti date BETWEEN date_add(current_date(), -7) AND current_date(), dapat menggeser jendela untuk menyertakan baris yang sebelumnya dikecualikan, sehingga memicu peralihan satu kali ke penghitungan ulang penuh.

Sertakan kolom predikat di GROUP BY

Saat melakukan agregasi, sertakan kolom predikat dalam GROUP BY agar mesin dapat mendorong predikat ke bawah agregasi.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

Jika kolom predikat tidak ada di GROUP BY, predikat tidak dapat didorong ke bawah agregasi dan sumber dipindai sepenuhnya.

Sertakan kolom predikat dalam kunci gabungan

Sertakan kolom predikat dalam kondisi join agar engine dapat memangkas semua sumber yang di-join.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

Jika tabel yang digabungkan tidak mengekspos kolom predikat, tabel tersebut dipindai secara penuh pada setiap refresh.

Diagnosis pengalihan ke penghitungan ulang penuh

Ketika refresh beralih ke penghitungan ulang penuh, alasannya dilaporkan dalam peristiwa planning_information untuk alur tersebut. Lihat Memantau log peristiwa alur. Tabel berikut ini mencantumkan alasan yang dilaporkan dalam peristiwa:

Reason Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW DML eksternal memodifikasi baris di jendela penggantian saat ini.
REPLACE_WHERE_NOT_DETERMINISTIC Predikat menggunakan ekspresi non-deterministik.
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC Refresh sebelumnya menggunakan predikat non-deterministik.
UNSUPPORTED_REPLACE_WHERE_PREDICATE Predikat tidak dapat didorong ke sumber mana pun, jendela saat ini menyertakan baris yang tidak diproses oleh predikat sebelumnya, atau eksekusi menggunakan penimpaan predikat.

Keterbatasan

Alur REPLACE WHERE memiliki batasan berikut:

  • Tabel target harus dibuat di dalam pipeline.
  • Hanya satu alur REPLACE WHERE yang diizinkan per tabel target.
  • Tabel yang menjadi target alur REPLACE WHERE tidak dapat juga menjadi target tipe alur lain, seperti alur AUTO CDC atau alur append.
  • Ekspektasi tidak didukung pada tabel yang menjadi target alur REPLACE WHERE.
  • Untuk tabel streaming yang dibuat di Databricks SQL, lihat alur REPLACE WHERE untuk tabel streaming independen untuk sintaks dan perbedaan pengisian ulang data.

Examples

Contoh berikut menunjukkan pola alur REPLACE WHERE umum.

Contoh 1: Menyimpan agregat historis dari sumber retensi terbatas

Contoh ini mempertahankan agregat harian selamanya, bahkan setelah data mentah tidak lagi disimpan dalam tabel sumber (retensi 3 hari):

SQL

CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
  return (
    spark.read.table("events_raw")
      .groupBy("date", "key")
      .agg(F.sum("val").alias("agg"))
  )

Contoh 2: Mencegah komputasi ulang saat tabel dimensi berubah

Contoh ini menjaga baris fakta historis tidak berubah saat atribut dimensi berubah:

SQL

CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
  fact_table = spark.read.table("fact_table").alias("f")
  dim_users = spark.read.table("dim_users").alias("d")
  return (
    fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
      .select(
        col("f.date"),
        col("f.user_id"),
        col("d.region"),
        col("f.revenue"),
      )
  )

Jika wilayah pengguna berubah, hanya baris terbaru yang dikomputasi ulang. Baris historis mempertahankan nilai wilayah pada saat baris tersebut ditulis. Untuk memperbaiki baris data historis, jalankan backfill secara terarah menggunakan penimpaan predikat.

Contoh 3: Menambahkan metrik baru tanpa mengolah ulang riwayat lengkap

Contoh ini menunjukkan cara mengembangkan definisi tabel dan mengisi ulang hanya rentang yang ditargetkan:

  1. Tentukan tabel awal:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    from pyspark.sql.functions import col
    
    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(F.count("*").alias("clicks"))
      )
    
  2. Perbarui kueri untuk menambahkan uniq_users:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(
            F.count("*").alias("clicks"),
            F.countDistinct("user_id").alias("uniq_users"),
          )
      )
    
  3. Isi ulang metrik baru selama 30 hari terakhir:

    overrides = [
      {
        "flow_name": "clickstream_daily",
        "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
      }
    ]
    
    resp = start_update_with_replace_where(
      pipeline_id="<pipeline-id>",
      replace_where_overrides=overrides,
      refresh_selection=["clickstream_daily"],
    )
    

    Baris yang lebih lama dari rentang yang diisi ulang berisi NULL untuk uniq_users.

Contoh 4: Iterasi pada jendela kecil sebelum mengisi ulang riwayat penuh

Contoh ini menunjukkan cara memvalidasi logika kueri pada jendela data kecil sebelum memproses rentang historis lengkap.

Mulailah dengan jendela pendek sehingga setiap refresh hanya mengolah ulang 7 hari terakhir saat Anda merevisi kueri:

SQL

CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
  return (
    spark.read.table("marketing_events")
      .groupBy("event_date", "campaign_id")
      .agg(F.sum("revenue").alias("total_revenue"))
  )

Setelah kueri difinalisasi, gunakan penggantian predikat untuk melakukan pengisian balik data historis satu kali:

overrides = [
  {
    "flow_name": "revenue_attribution",
    "predicate_override": "event_date >= date_add(current_date(), -365)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id="<pipeline-id>",
  replace_where_overrides=overrides,
  refresh_selection=["revenue_attribution"],
)