Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Important
Alur REPLACE WHERE untuk tabel streaming mandiri masih dalam Beta.
Halaman ini menjelaskan cara menggunakan alur kerja REPLACE WHERE untuk menghitung ulang dan menimpa subset tertentu yang ditargetkan dari tabel streaming mandiri tanpa memproses ulang seluruh riwayat tabel Anda. Alur REPLACE WHERE menangani data yang datang terlambat, pemrosesan ulang upstream, evolusi skema, dan pengisian ulang data historis.
Dengan alur REPLACE WHERE , Anda menentukan predikat pada tabel target. Semua baris yang cocok dengan predikat dihapus dan diganti dengan mengevaluasi ulang kueri sumber untuk rentang predikat yang sama. Baris yang tidak cocok dengan predikat dibiarkan tidak tersentuh.
Persyaratan
Alur REPLACE WHERE memiliki persyaratan berikut:
- Tabel streaming Anda harus menggunakan
PREVIEWsaluran. Lihatchanneldi Konfigurasi alur. - Databricks merekomendasikan Unity Catalog dan komputasi tanpa server. Refresh bertahap hanya didukung pada komputasi tanpa server.
Kapan menggunakan alur REPLACE WHERE
Gunakan alur REPLACE WHERE untuk skenario berikut:
- Pemrosesan batch inkremental tanpa semantik streaming: Proses baris baru secara batch tanpa mengelola konsep streaming seperti watermark.
- Pemrosesan ulang selektif: Komputasi ulang hanya baris yang cocok dengan predikat sambil membiarkan semua baris lain tidak tersentuh.
-
Skenario di luar kemampuan tampilan materialisasi standar:
- Tabel target yang memiliki periode retensi lebih lama dibandingkan tabel sumber
- Mencegah komputasi ulang saat tabel dimensi berubah
- Evolusi skema tanpa mengolah ulang seluruh riwayat
Membuat alur REPLACE WHERE
Gunakan klausa sebaris FLOW REPLACE WHERE dengan CREATE OR REFRESH STREAMING TABLE:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Selama refresh, semua baris dalam tabel target yang cocok dengan predikat dihapus, kueri sumber dikomputasi ulang untuk rentang predikat yang sama, dan hasil baru dimasukkan. Dalam contoh ini, semua baris dari 7 hari terakhir dihapus dari orders_enriched dan dikomputasi ulang menggunakan kueri sumber.
Anda tidak perlu menambahkan predikat ke kueri sumber. Mesin alur secara otomatis menerapkannya saat membaca dari sumber.
Note
BY NAME diperlukan. Ini memastikan kolom dicocokkan berdasarkan nama daripada posisi.
Mengisi ulang data historis
Untuk melakukan backfill, jalankan pernyataan DML langsung pada tabel target:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Perilaku refresh penuh
Penyegaran penuh pada alur REPLACE WHERE mengeksekusi ulang kueri sumber dengan hanya menggunakan predikat saat ini. Baris yang disisipkan oleh pernyataan DML di luar rentang predikat saat ini dihapus secara permanen.
Warning
Refresh penuh menghapus semua data yang ada dan menjalankan kembali alur hanya menggunakan predikat yang ditentukan. Jika alur telah berjalan selama setahun dengan predikat 7 hari, refresh penuh menghasilkan tabel yang hanya berisi data 7 hari terakhir. Semua baris lama dihapus secara permanen.
REFRESH STREAMING TABLE orders_enriched FULL;
Untuk mencegah refresh penuh pada tabel, atur properti pipelines.reset.allowed tabel ke false:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.reset.allowed = 'false')
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
...
Penyegaran bertahap
Alur REPLACE WHERE menggunakan refresh bertambah bertahap jika memungkinkan, hanya memproses ulang data sumber yang telah berubah sejak refresh terakhir daripada mengolah ulang seluruh jendela penggantian. Penyegaran inkremental memerlukan komputasi nirserver.
Saat refresh bertahap berlaku
Semua hal berikut ini harus benar:
- Alur berjalan pada komputasi tanpa server.
- Bentuk kueri didukung. Lihat Penyegaran inkremental untuk kumpulan operator yang didukung.
- Predikat mereferensikan kolom dasar dari tabel sumber. Predikat pada nilai turunan, seperti hasil fungsi agregat atau fungsi jendela, tidak dapat diterapkan ke sumber data, yang menonaktifkan penyegaran bertahap.
- Tidak ada DML eksternal yang telah memodifikasi baris di jendela ganti saat ini. DML yang memodifikasi baris di luar jendela saat ini tidak terpengaruh.
- Jendela ganti saat ini tidak menyertakan baris yang dikecualikan oleh predikat sebelumnya. Jika Anda memperlebar predikat untuk mencakup rentang yang sebelumnya tidak diproses, refresh tersebut akan kembali ke komputasi ulang penuh. Refresh berikutnya dapat di-refresh secara inkremental kembali.
- Predikatnya deterministik. Predikat menggunakan fungsi non-deterministik seperti
rand()menonaktifkan refresh bertahap. Fungsi temporal seperticurrent_date()diizinkan.
Refresh pertama dari alur apa pun selalu merupakan komputasi penuh. Jika ada kondisi yang tidak terpenuhi, refresh tersebut kembali ke komputasi ulang penuh dari jendela ganti saat ini.
Praktik terbaik untuk penyegaran inkremental
Ikuti panduan ini agar alur REPLACE WHERE tetap memenuhi syarat untuk penyegaran inkremental.
Gunakan batas bawah bergerak
Predikat dengan batas bawah bergerak tetap memenuhi syarat untuk refresh bertahap tanpa batas waktu.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Batas atas dinamis, seperti date BETWEEN date_add(current_date(), -7) AND current_date(), dapat menggeser jendela untuk menyertakan baris yang sebelumnya dikecualikan, sehingga memicu peralihan satu kali ke penghitungan ulang penuh.
Sertakan kolom predikat di GROUP BY
Saat melakukan agregasi, sertakan kolom predikat dalam GROUP BY agar mesin dapat mendorong predikat ke bawah agregasi.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Jika kolom predikat tidak ada di GROUP BY, predikat tidak dapat didorong ke bawah agregasi dan sumber dipindai sepenuhnya.
Sertakan kolom predikat dalam kunci gabungan
Sertakan kolom predikat dalam kondisi join agar engine dapat memangkas semua sumber yang di-join.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Jika tabel yang digabungkan tidak mengekspos kolom predikat, tabel tersebut dipindai secara penuh pada setiap refresh.
Diagnosis pengalihan ke penghitungan ulang penuh
Ketika refresh beralih ke penghitungan ulang penuh, alasannya dilaporkan dalam peristiwa planning_information untuk alur tersebut. Lihat Memantau log peristiwa alur. Tabel berikut ini mencantumkan alasan yang dilaporkan dalam peristiwa:
| Reason | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
DML eksternal memodifikasi baris di jendela penggantian saat ini. |
REPLACE_WHERE_NOT_DETERMINISTIC |
Predikat menggunakan ekspresi non-deterministik. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
Refresh sebelumnya menggunakan predikat non-deterministik. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
Predikat tidak dapat didorong ke sumber mana pun, jendela saat ini menyertakan baris yang tidak diproses oleh predikat sebelumnya, atau eksekusi menggunakan penimpaan predikat. |
Examples
Contoh berikut menunjukkan pola alur REPLACE WHERE umum.
Contoh 1: Menyimpan agregat historis dari sumber retensi terbatas
Contoh ini mempertahankan agregat harian selamanya, bahkan setelah data mentah tidak lagi disimpan dalam tabel sumber (retensi 3 hari):
CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Contoh 2: Mencegah komputasi ulang saat tabel dimensi berubah
Contoh ini menjaga baris fakta historis tidak berubah saat atribut dimensi berubah:
CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Jika wilayah pengguna berubah, hanya baris terbaru yang dikomputasi ulang. Baris historis mempertahankan nilai wilayah pada saat baris tersebut ditulis.
Contoh 3: Menambahkan metrik baru tanpa mengolah ulang riwayat lengkap
Contoh ini menunjukkan cara mengembangkan definisi tabel dan mengisi ulang hanya rentang yang ditargetkan:
Tentukan tabel awal:
CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Perbarui kueri untuk menambahkan
uniq_users:CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Baris yang lebih lama dari jendela 7 hari berisi
NULLuntukuniq_users.
Contoh 4: Iterasi pada jendela kecil sebelum mengisi ulang riwayat penuh
Contoh ini menunjukkan cara memvalidasi logika kueri pada jendela data kecil sebelum memproses rentang historis lengkap.
Mulailah dengan jendela pendek untuk memvalidasi metrik dan melakukan iterasi pada logika bisnis dengan biaya komputasi yang lebih rendah:
CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Jendela pendek hanya mengolah ulang 7 hari terakhir pada setiap refresh, jadi revisi kueri sebanyak yang diperlukan sebelum berkomitmen pada eksekusi historis penuh.
Setelah kueri diselesaikan, gunakan DML untuk mengisi ulang rentang historis lengkap:
INSERT INTO revenue_attribution
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;