Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Important
Alur REPLACE WHERE masih dalam versi Beta.
Halaman ini menjelaskan cara menggunakan alur REPLACE WHERE di Lakeflow Spark Declarative Pipelines untuk mengolah ulang dan menimpa subset tabel yang ditargetkan tanpa memproses ulang seluruh riwayat tabel Anda. Alur REPLACE WHERE menangani data yang datang terlambat, pemrosesan ulang upstream, evolusi skema, dan pengisian ulang data historis.
Dengan alur REPLACE WHERE , Anda menentukan predikat pada tabel target. Semua baris yang cocok dengan predikat dihapus dan diganti dengan mengevaluasi ulang kueri sumber untuk rentang predikat yang sama. Baris yang tidak cocok dengan predikat dibiarkan tidak tersentuh.
Requirements
Alur REPLACE WHERE memiliki persyaratan berikut:
- Alur Anda harus menggunakan
PREVIEWsaluran. - Databricks merekomendasikan Unity Catalog dan komputasi tanpa server. Refresh bertahap hanya didukung pada komputasi tanpa server.
Kapan menggunakan alur REPLACE WHERE
Gunakan alur REPLACE WHERE untuk skenario berikut:
- Pemrosesan batch inkremental tanpa semantik streaming: Proses baris baru secara batch tanpa mengelola konsep streaming seperti watermark.
- Pemrosesan ulang selektif: Komputasi ulang hanya baris yang cocok dengan predikat sambil membiarkan semua baris lain tidak tersentuh.
-
Skenario di luar kemampuan tampilan materialisasi standar:
- Tabel target yang memiliki periode retensi lebih lama dibandingkan tabel sumber
- Mencegah komputasi ulang saat tabel dimensi berubah
- Evolusi skema tanpa mengolah ulang seluruh riwayat
Membuat alur REPLACE WHERE
Tentukan alur REPLACE WHERE baik di SQL atau Python.
SQL
Gunakan klausa sebaris FLOW REPLACE WHERE dengan CREATE STREAMING TABLE:
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Atau, gunakan sintaks bentuk CREATE FLOW panjang:
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Python
Dalam Python, tabel dan alur ditentukan dalam satu pernyataan. Alur mewarisi nama yang sama dengan tabel:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
Parameter replace_where menerima ekspresi kolom PySpark atau predikat string.
Dalam contoh ini, semua baris dari 7 hari terakhir dihapus dari orders_enriched dan dikomputasi ulang menggunakan kueri sumber. Anda tidak perlu menambahkan predikat ke kueri sumber. Mesin alur secara otomatis menerapkannya saat membaca dari sumber.
Note
BY NAME diperlukan dalam SQL. Ini mencocokkan kolom berdasarkan nama, bukan berdasarkan posisi.
Mengisi ulang data historis
Untuk menulis baris data historis atau yang telah dikoreksi ke dalam tabel target di luar penyegaran terjadwal, pilih salah satu dari dua mekanisme berdasarkan lokasi data historis disimpan:
- Penggantian predikat: Jalankan kembali kueri sumber alur untuk rentang predikat sekali saja. Gunakan saat data historis berasal dari sumber yang sama dengan data inkremental.
- Pernyataan DML: Sisipkan ke tabel target secara langsung, melewati alur. Gunakan saat data historis berada di sumber yang berbeda dari data inkremental.
Penggantian predikat
Ambil alih predikat REPLACE WHERE untuk pembaruan alur tunggal tanpa memodifikasi definisi alur. Penggantian predikat hanya berlaku satu kali, hanya diterapkan pada pembaruan saat ini, dan tidak memengaruhi eksekusi berikutnya.
Contoh: Beban historis awal
Untuk melakukan pengisian balik data historis satu kali saat pertama kali menyiapkan pipeline:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
Contoh: Memperbaiki kolom untuk periode tertentu
Setelah memperbarui definisi kolom, isi ulang perubahan untuk rentang historis yang ditargetkan:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
Gabungkan beberapa dimensi dalam satu penggantian predikat:
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
}
]
Fungsi pembantu: start_update_with_replace_where
Gunakan API pembaruan pipeline dari notebook untuk mengirimkan override predikat:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
Statemen DML
Jalankan pernyataan DML langsung pada tabel target dari luar pipeline untuk melakukan pemuatan awal atau koreksi, seperti memuat data dari tabel lama:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Baris yang disisipkan melalui DML tidak dipengaruhi oleh predikat REPLACE WHERE dan tetap ada setelah penyegaran terjadwal, kecuali jika baris tersebut termasuk dalam rentang predikat pada eksekusi mendatang.
Perilaku refresh penuh
Penyegaran penuh pada alur REPLACE WHERE mengeksekusi ulang kueri sumber dengan hanya menggunakan predikat saat ini. Baris yang disisipkan oleh penimpaan predikat atau pernyataan DML di luar rentang predikat saat ini dihapus secara permanen.
Warning
Refresh penuh menghapus semua data yang ada dan menjalankan kembali alur hanya menggunakan predikat yang ditentukan. Jika alur telah berjalan selama setahun dengan predikat 7 hari, refresh penuh menghasilkan tabel yang hanya berisi data 7 hari terakhir. Semua baris lama dihapus secara permanen.
Untuk mencegah refresh penuh pada tabel, atur properti pipelines.reset.allowed tabel ke false. Lihat Referensi properti alur.
Penyegaran bertahap
Alur REPLACE WHERE menggunakan refresh bertambah bertahap jika memungkinkan, hanya memproses ulang data sumber yang telah berubah sejak refresh terakhir daripada mengolah ulang seluruh jendela penggantian. Penyegaran inkremental memerlukan komputasi nirserver.
Saat refresh bertahap berlaku
Semua hal berikut ini harus benar:
- Alur berjalan pada komputasi tanpa server.
- Bentuk kueri didukung. Lihat Penyegaran inkremental untuk kumpulan operator yang didukung.
- Predikat mereferensikan kolom dasar dari tabel sumber. Predikat pada nilai turunan, seperti hasil fungsi agregat atau fungsi jendela, tidak dapat diterapkan ke sumber data, yang menonaktifkan penyegaran bertahap.
- Tidak ada DML eksternal yang telah memodifikasi baris di jendela ganti saat ini. DML yang memodifikasi baris di luar jendela saat ini tidak terpengaruh.
- Jendela ganti saat ini tidak menyertakan baris yang dikecualikan oleh predikat sebelumnya. Jika Anda memperlebar predikat untuk mencakup rentang yang sebelumnya tidak diproses, refresh tersebut akan kembali ke komputasi ulang penuh. Refresh berikutnya dapat di-refresh secara inkremental kembali.
- Predikatnya deterministik. Predikat menggunakan fungsi non-deterministik seperti
rand()menonaktifkan refresh bertahap. Fungsi temporal seperticurrent_date()diizinkan.
Refresh pertama dari alur apa pun selalu merupakan komputasi penuh. Jika ada kondisi yang tidak terpenuhi, refresh tersebut kembali ke komputasi ulang penuh dari jendela ganti saat ini.
Praktik terbaik untuk penyegaran inkremental
Ikuti panduan ini agar alur REPLACE WHERE tetap memenuhi syarat untuk penyegaran inkremental.
Gunakan batas bawah bergerak
Predikat dengan batas bawah bergerak tetap memenuhi syarat untuk refresh bertahap tanpa batas waktu.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Batas atas dinamis, seperti date BETWEEN date_add(current_date(), -7) AND current_date(), dapat menggeser jendela untuk menyertakan baris yang sebelumnya dikecualikan, sehingga memicu peralihan satu kali ke penghitungan ulang penuh.
Sertakan kolom predikat di GROUP BY
Saat melakukan agregasi, sertakan kolom predikat dalam GROUP BY agar mesin dapat mendorong predikat ke bawah agregasi.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Jika kolom predikat tidak ada di GROUP BY, predikat tidak dapat didorong ke bawah agregasi dan sumber dipindai sepenuhnya.
Sertakan kolom predikat dalam kunci gabungan
Sertakan kolom predikat dalam kondisi join agar engine dapat memangkas semua sumber yang di-join.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Jika tabel yang digabungkan tidak mengekspos kolom predikat, tabel tersebut dipindai secara penuh pada setiap refresh.
Diagnosis pengalihan ke penghitungan ulang penuh
Ketika refresh beralih ke penghitungan ulang penuh, alasannya dilaporkan dalam peristiwa planning_information untuk alur tersebut. Lihat Memantau log peristiwa alur. Tabel berikut ini mencantumkan alasan yang dilaporkan dalam peristiwa:
| Reason | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
DML eksternal memodifikasi baris di jendela penggantian saat ini. |
REPLACE_WHERE_NOT_DETERMINISTIC |
Predikat menggunakan ekspresi non-deterministik. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
Refresh sebelumnya menggunakan predikat non-deterministik. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
Predikat tidak dapat didorong ke sumber mana pun, jendela saat ini menyertakan baris yang tidak diproses oleh predikat sebelumnya, atau eksekusi menggunakan penimpaan predikat. |
Keterbatasan
Alur REPLACE WHERE memiliki batasan berikut:
- Tabel target harus dibuat di dalam pipeline.
- Hanya satu alur REPLACE WHERE yang diizinkan per tabel target.
- Tabel yang menjadi target alur REPLACE WHERE tidak dapat juga menjadi target tipe alur lain, seperti alur AUTO CDC atau alur append.
- Ekspektasi tidak didukung pada tabel yang menjadi target alur REPLACE WHERE.
- Untuk tabel streaming yang dibuat di Databricks SQL, lihat alur REPLACE WHERE untuk tabel streaming independen untuk sintaks dan perbedaan pengisian ulang data.
Examples
Contoh berikut menunjukkan pola alur REPLACE WHERE umum.
Contoh 1: Menyimpan agregat historis dari sumber retensi terbatas
Contoh ini mempertahankan agregat harian selamanya, bahkan setelah data mentah tidak lagi disimpan dalam tabel sumber (retensi 3 hari):
SQL
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
return (
spark.read.table("events_raw")
.groupBy("date", "key")
.agg(F.sum("val").alias("agg"))
)
Contoh 2: Mencegah komputasi ulang saat tabel dimensi berubah
Contoh ini menjaga baris fakta historis tidak berubah saat atribut dimensi berubah:
SQL
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
fact_table = spark.read.table("fact_table").alias("f")
dim_users = spark.read.table("dim_users").alias("d")
return (
fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
.select(
col("f.date"),
col("f.user_id"),
col("d.region"),
col("f.revenue"),
)
)
Jika wilayah pengguna berubah, hanya baris terbaru yang dikomputasi ulang. Baris historis mempertahankan nilai wilayah pada saat baris tersebut ditulis. Untuk memperbaiki baris data historis, jalankan backfill secara terarah menggunakan penimpaan predikat.
Contoh 3: Menambahkan metrik baru tanpa mengolah ulang riwayat lengkap
Contoh ini menunjukkan cara mengembangkan definisi tabel dan mengisi ulang hanya rentang yang ditargetkan:
Tentukan tabel awal:
SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Python
from pyspark import pipelines as dp from pyspark.sql import functions as F from pyspark.sql.functions import col @dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg(F.count("*").alias("clicks")) )Perbarui kueri untuk menambahkan
uniq_users:SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Python
@dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg( F.count("*").alias("clicks"), F.countDistinct("user_id").alias("uniq_users"), ) )Isi ulang metrik baru selama 30 hari terakhir:
overrides = [ { "flow_name": "clickstream_daily", "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'", } ] resp = start_update_with_replace_where( pipeline_id="<pipeline-id>", replace_where_overrides=overrides, refresh_selection=["clickstream_daily"], )Baris yang lebih lama dari rentang yang diisi ulang berisi
NULLuntukuniq_users.
Contoh 4: Iterasi pada jendela kecil sebelum mengisi ulang riwayat penuh
Contoh ini menunjukkan cara memvalidasi logika kueri pada jendela data kecil sebelum memproses rentang historis lengkap.
Mulailah dengan jendela pendek sehingga setiap refresh hanya mengolah ulang 7 hari terakhir saat Anda merevisi kueri:
SQL
CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
return (
spark.read.table("marketing_events")
.groupBy("event_date", "campaign_id")
.agg(F.sum("revenue").alias("total_revenue"))
)
Setelah kueri difinalisasi, gunakan penggantian predikat untuk melakukan pengisian balik data historis satu kali:
overrides = [
{
"flow_name": "revenue_attribution",
"predicate_override": "event_date >= date_add(current_date(), -365)",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["revenue_attribution"],
)