Alur REPLACE WHERE untuk tabel streaming mandiri

Important

Alur REPLACE WHERE untuk tabel streaming mandiri masih dalam Beta.

Halaman ini menjelaskan cara menggunakan alur kerja REPLACE WHERE untuk menghitung ulang dan menimpa subset tertentu yang ditargetkan dari tabel streaming mandiri tanpa memproses ulang seluruh riwayat tabel Anda. Alur REPLACE WHERE menangani data yang datang terlambat, pemrosesan ulang upstream, evolusi skema, dan pengisian ulang data historis.

Dengan alur REPLACE WHERE , Anda menentukan predikat pada tabel target. Semua baris yang cocok dengan predikat dihapus dan diganti dengan mengevaluasi ulang kueri sumber untuk rentang predikat yang sama. Baris yang tidak cocok dengan predikat dibiarkan tidak tersentuh.

Persyaratan

Alur REPLACE WHERE memiliki persyaratan berikut:

  • Tabel streaming Anda harus menggunakan PREVIEW saluran. Lihat channel di Konfigurasi alur.
  • Databricks merekomendasikan Unity Catalog dan komputasi tanpa server. Refresh bertahap hanya didukung pada komputasi tanpa server.

Kapan menggunakan alur REPLACE WHERE

Gunakan alur REPLACE WHERE untuk skenario berikut:

  • Pemrosesan batch inkremental tanpa semantik streaming: Proses baris baru secara batch tanpa mengelola konsep streaming seperti watermark.
  • Pemrosesan ulang selektif: Komputasi ulang hanya baris yang cocok dengan predikat sambil membiarkan semua baris lain tidak tersentuh.
  • Skenario di luar kemampuan tampilan materialisasi standar:
    • Tabel target yang memiliki periode retensi lebih lama dibandingkan tabel sumber
    • Mencegah komputasi ulang saat tabel dimensi berubah
    • Evolusi skema tanpa mengolah ulang seluruh riwayat

Membuat alur REPLACE WHERE

Gunakan klausa sebaris FLOW REPLACE WHERE dengan CREATE OR REFRESH STREAMING TABLE:

CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Selama refresh, semua baris dalam tabel target yang cocok dengan predikat dihapus, kueri sumber dikomputasi ulang untuk rentang predikat yang sama, dan hasil baru dimasukkan. Dalam contoh ini, semua baris dari 7 hari terakhir dihapus dari orders_enriched dan dikomputasi ulang menggunakan kueri sumber.

Anda tidak perlu menambahkan predikat ke kueri sumber. Mesin alur secara otomatis menerapkannya saat membaca dari sumber.

Note

BY NAME diperlukan. Ini memastikan kolom dicocokkan berdasarkan nama daripada posisi.

Mengisi ulang data historis

Untuk melakukan backfill, jalankan pernyataan DML langsung pada tabel target:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

Perilaku refresh penuh

Penyegaran penuh pada alur REPLACE WHERE mengeksekusi ulang kueri sumber dengan hanya menggunakan predikat saat ini. Baris yang disisipkan oleh pernyataan DML di luar rentang predikat saat ini dihapus secara permanen.

Warning

Refresh penuh menghapus semua data yang ada dan menjalankan kembali alur hanya menggunakan predikat yang ditentukan. Jika alur telah berjalan selama setahun dengan predikat 7 hari, refresh penuh menghasilkan tabel yang hanya berisi data 7 hari terakhir. Semua baris lama dihapus secara permanen.

REFRESH STREAMING TABLE orders_enriched FULL;

Untuk mencegah refresh penuh pada tabel, atur properti pipelines.reset.allowed tabel ke false:

CREATE OR REFRESH STREAMING TABLE orders_enriched
  TBLPROPERTIES (pipelines.reset.allowed = 'false')
  FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
  ...

Penyegaran bertahap

Alur REPLACE WHERE menggunakan refresh bertambah bertahap jika memungkinkan, hanya memproses ulang data sumber yang telah berubah sejak refresh terakhir daripada mengolah ulang seluruh jendela penggantian. Penyegaran inkremental memerlukan komputasi nirserver.

Saat refresh bertahap berlaku

Semua hal berikut ini harus benar:

  • Alur berjalan pada komputasi tanpa server.
  • Bentuk kueri didukung. Lihat Penyegaran inkremental untuk kumpulan operator yang didukung.
  • Predikat mereferensikan kolom dasar dari tabel sumber. Predikat pada nilai turunan, seperti hasil fungsi agregat atau fungsi jendela, tidak dapat diterapkan ke sumber data, yang menonaktifkan penyegaran bertahap.
  • Tidak ada DML eksternal yang telah memodifikasi baris di jendela ganti saat ini. DML yang memodifikasi baris di luar jendela saat ini tidak terpengaruh.
  • Jendela ganti saat ini tidak menyertakan baris yang dikecualikan oleh predikat sebelumnya. Jika Anda memperlebar predikat untuk mencakup rentang yang sebelumnya tidak diproses, refresh tersebut akan kembali ke komputasi ulang penuh. Refresh berikutnya dapat di-refresh secara inkremental kembali.
  • Predikatnya deterministik. Predikat menggunakan fungsi non-deterministik seperti rand() menonaktifkan refresh bertahap. Fungsi temporal seperti current_date() diizinkan.

Refresh pertama dari alur apa pun selalu merupakan komputasi penuh. Jika ada kondisi yang tidak terpenuhi, refresh tersebut kembali ke komputasi ulang penuh dari jendela ganti saat ini.

Praktik terbaik untuk penyegaran inkremental

Ikuti panduan ini agar alur REPLACE WHERE tetap memenuhi syarat untuk penyegaran inkremental.

Gunakan batas bawah bergerak

Predikat dengan batas bawah bergerak tetap memenuhi syarat untuk refresh bertahap tanpa batas waktu.

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

Batas atas dinamis, seperti date BETWEEN date_add(current_date(), -7) AND current_date(), dapat menggeser jendela untuk menyertakan baris yang sebelumnya dikecualikan, sehingga memicu peralihan satu kali ke penghitungan ulang penuh.

Sertakan kolom predikat di GROUP BY

Saat melakukan agregasi, sertakan kolom predikat dalam GROUP BY agar mesin dapat mendorong predikat ke bawah agregasi.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

Jika kolom predikat tidak ada di GROUP BY, predikat tidak dapat didorong ke bawah agregasi dan sumber dipindai sepenuhnya.

Sertakan kolom predikat dalam kunci gabungan

Sertakan kolom predikat dalam kondisi join agar engine dapat memangkas semua sumber yang di-join.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

Jika tabel yang digabungkan tidak mengekspos kolom predikat, tabel tersebut dipindai secara penuh pada setiap refresh.

Diagnosis pengalihan ke penghitungan ulang penuh

Ketika refresh beralih ke penghitungan ulang penuh, alasannya dilaporkan dalam peristiwa planning_information untuk alur tersebut. Lihat Memantau log peristiwa alur. Tabel berikut ini mencantumkan alasan yang dilaporkan dalam peristiwa:

Reason Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW DML eksternal memodifikasi baris di jendela penggantian saat ini.
REPLACE_WHERE_NOT_DETERMINISTIC Predikat menggunakan ekspresi non-deterministik.
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC Refresh sebelumnya menggunakan predikat non-deterministik.
UNSUPPORTED_REPLACE_WHERE_PREDICATE Predikat tidak dapat didorong ke sumber mana pun, jendela saat ini menyertakan baris yang tidak diproses oleh predikat sebelumnya, atau eksekusi menggunakan penimpaan predikat.

Examples

Contoh berikut menunjukkan pola alur REPLACE WHERE umum.

Contoh 1: Menyimpan agregat historis dari sumber retensi terbatas

Contoh ini mempertahankan agregat harian selamanya, bahkan setelah data mentah tidak lagi disimpan dalam tabel sumber (retensi 3 hari):

CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Contoh 2: Mencegah komputasi ulang saat tabel dimensi berubah

Contoh ini menjaga baris fakta historis tidak berubah saat atribut dimensi berubah:

CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

Jika wilayah pengguna berubah, hanya baris terbaru yang dikomputasi ulang. Baris historis mempertahankan nilai wilayah pada saat baris tersebut ditulis.

Contoh 3: Menambahkan metrik baru tanpa mengolah ulang riwayat lengkap

Contoh ini menunjukkan cara mengembangkan definisi tabel dan mengisi ulang hanya rentang yang ditargetkan:

  1. Tentukan tabel awal:

    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    
  2. Perbarui kueri untuk menambahkan uniq_users:

    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    Baris yang lebih lama dari jendela 7 hari berisi NULL untuk uniq_users.

Contoh 4: Iterasi pada jendela kecil sebelum mengisi ulang riwayat penuh

Contoh ini menunjukkan cara memvalidasi logika kueri pada jendela data kecil sebelum memproses rentang historis lengkap.

Mulailah dengan jendela pendek untuk memvalidasi metrik dan melakukan iterasi pada logika bisnis dengan biaya komputasi yang lebih rendah:

CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Jendela pendek hanya mengolah ulang 7 hari terakhir pada setiap refresh, jadi revisi kueri sebanyak yang diperlukan sebelum berkomitmen pada eksekusi historis penuh.

Setelah kueri diselesaikan, gunakan DML untuk mengisi ulang rentang historis lengkap:

INSERT INTO revenue_attribution
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;