Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Importante
I flussi REPLACE WHERE per le tabelle di streaming autonome sono disponibili in Beta.
Questa pagina descrive come usare i flussi REPLACE WHERE per ricompilare e sovrascrivere un subset di destinazione di una tabella di streaming autonoma senza rielaborare l'intera cronologia delle tabelle. I flussi REPLACE WHERE gestiscono i dati in arrivo in ritardo, la rielaborazione upstream, l'evoluzione dello schema e i backfill.
Con un flusso REPLACE WHERE si definisce un predicato nella tabella di destinazione. Tutte le righe corrispondenti al predicato vengono eliminate e sostituite rivalutando la query di origine per lo stesso intervallo di predicati. Le righe che non corrispondono al predicato vengono lasciate invariate.
Requisiti
I flussi REPLACE WHERE hanno i requisiti seguenti:
- La tabella di streaming deve usare il
PREVIEWcanale. Vederechannelin Configurazioni della pipeline. - Databricks consiglia il catalogo Unity e il calcolo serverless. L'aggiornamento incrementale è supportato solo nel calcolo serverless.
Quando usare i flussi REPLACE WHERE
Usare i flussi REPLACE WHERE per gli scenari seguenti:
- Elaborazione batch incrementale senza semantica di streaming: elaborare nuove righe in batch senza gestire concetti di streaming come filigrane.
- Rielaborazione selettiva: ricompilare solo le righe che corrispondono a un predicato lasciando invariate tutte le altre righe.
-
Scenari che superano le funzionalità di visualizzazione materializzate standard:
- Tabelle di destinazione con un periodo di conservazione più lungo rispetto alla sorgente
- Impedire la ricompilazione quando viene modificata una tabella delle dimensioni
- Evoluzione dello schema senza ricompilare l'intera cronologia
Creare un flusso REPLACE WHERE
Usa la clausola FLOW REPLACE WHERE in linea con CREATE OR REFRESH STREAMING TABLE:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Durante l'aggiornamento, tutte le righe della tabella di destinazione che corrispondono al predicato vengono eliminate, la query di origine viene ricalcodata per lo stesso intervallo di predicati e vengono inseriti i nuovi risultati. In questo esempio, tutte le righe degli ultimi 7 giorni vengono eliminate da orders_enriched e ricalcolate usando la query di origine.
Non è necessario aggiungere il predicato alla query di origine. Il motore della pipeline lo applica automaticamente durante la lettura dalla sorgente.
Note
BY NAME è obbligatorio. Garantisce che le colonne corrispondano in base al nome anziché alla posizione.
Eseguire il backfill dei dati cronologici
Per eseguire il backfill, eseguire istruzioni DML direttamente nella tabella di destinazione:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Comportamento di aggiornamento completo
Un aggiornamento completo di un flusso REPLACE WHERE esegue nuovamente la query di origine usando solo il predicato corrente. Le righe inserite dalle istruzioni DML all'esterno dell'intervallo di predicati corrente vengono eliminate definitivamente.
Avvertimento
Un aggiornamento completo cancella tutti i dati esistenti ed esegue nuovamente il flusso usando solo il predicato definito. Se una pipeline è stata eseguita per un anno con un predicato di 7 giorni, un aggiornamento completo restituisce la tabella contenente solo gli ultimi 7 giorni di dati. Tutte le righe precedenti vengono eliminate definitivamente.
REFRESH STREAMING TABLE orders_enriched FULL;
Per evitare aggiornamenti completi in una tabella, impostare la proprietà pipelines.reset.allowed table su false:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.reset.allowed = 'false')
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
...
Aggiornamento incrementale
I flussi REPLACE WHERE usano l'aggiornamento incrementale quando possibile, rielaborando solo i dati di origine modificati dopo l'ultimo aggiornamento anziché ricompilare l'intera finestra di sostituzione. L'aggiornamento incrementale richiede capacità di calcolo serverless.
Quando si applica l'aggiornamento incrementale
Tutte le condizioni seguenti devono essere vere:
- La pipeline viene eseguita su un'infrastruttura serverless.
- La forma di query è supportata. Vedere Aggiornamento incrementale per il set di operatori supportato.
- Il predicato fa riferimento alle colonne di base di una tabella di origine. I predicati sui valori derivati, ad esempio output di funzioni aggregate o finestra, non possono essere inseriti in un'origine, che disabilita l'aggiornamento incrementale.
- Nessun DML esterno ha modificato righe nella finestra di sostituzione corrente. Il DML che modifica le righe al di fuori della finestra corrente non viene interessato.
- La finestra di sostituzione corrente non include righe escluse dal predicato precedente. Se si estende il predicato per coprire un intervallo non elaborato in precedenza, tale aggiornamento torna alla ricompilazione completa. Gli aggiornamenti successivi possono nuovamente beneficiare dell'aggiornamento incrementale.
- Il predicato è deterministico. I predicati che usano funzioni non deterministiche come
rand()disabilitano l'aggiornamento incrementale. Sono consentite funzioni temporali,current_date()ad esempio .
Il primo aggiornamento di qualsiasi flusso è sempre un calcolo completo. Se una qualsiasi condizione non è soddisfatta, l'aggiornamento ripiega sul ricalcolo completo della finestra di sostituzione corrente.
Procedure consigliate per l'aggiornamento incrementale
Seguire queste linee guida in modo che i flussi REPLACE WHERE rimangano idonei per l'aggiornamento incrementale.
Usa un limite inferiore variabile
I predicati con un limite inferiore mobile rimangono idonei per l'aggiornamento incrementale a tempo indeterminato.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Un limite superiore mobile, come date BETWEEN date_add(current_date(), -7) AND current_date(), può spostare la finestra fino a includere righe precedentemente escluse, innescando un fallback una tantum alla ricomputazione completa.
Includere la colonna del predicato in GROUP BY
Durante l'aggregazione, includere la colonna del predicato in GROUP BY in modo che il motore possa spostare il predicato al di sotto dell'aggregazione.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Se la colonna del predicato non è presente in GROUP BY, il predicato non può essere inserito sotto l'aggregazione e l'origine viene analizzata completamente.
Includere la colonna del predicato nelle chiavi di join
Includi la colonna del predicato nella condizione di join in modo che il motore possa scartare tutte le origini incluse nel join.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Se una tabella unita non espone la colonna del predicato, tale tabella viene scansionata per intero a ogni aggiornamento.
Diagnostica del ripiego al ricalcolo completo
Quando un aggiornamento ricorre al ricalcolo completo, il motivo viene segnalato nell'evento planning_information del flusso. Vedi Monitorare i log degli eventi della pipeline. Nella tabella seguente sono elencati i motivi segnalati nell'evento :
| Motivo | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Un'operazione DML esterna ha modificato delle righe nella finestra di sostituzione corrente. |
REPLACE_WHERE_NOT_DETERMINISTIC |
Il predicato usa espressioni non deterministiche. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
L'aggiornamento precedente usava un predicato non deterministico. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
Non è possibile eseguire il push del predicato in qualsiasi origine, la finestra corrente include righe non elaborate dal predicato precedente oppure l'esecuzione usa un override del predicato. |
Examples
Negli esempi seguenti vengono illustrati modelli di flusso REPLACE WHERE comuni.
Esempio 1: Mantenere le aggregazioni cronologiche da un'origine di conservazione limitata
Questo esempio mantiene le aggregazioni giornaliere per un periodo illimitato, anche dopo che i dati non elaborati escono dalla tabella di origine (conservazione di 3 giorni):
CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Esempio 2: Impedire la ricompilazione quando viene modificata una tabella delle dimensioni
Questo esempio mantiene invariate le righe dei fatti cronologici quando gli attributi della dimensione cambiano:
CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Se l'area di un utente cambia, vengono ricalcolate solo le righe recenti. Le righe storiche mantengono il valore della regione al momento in cui sono state scritte.
Esempio 3: Aggiungere una nuova metrica senza ricompilare la cronologia completa
In questo esempio viene illustrato come evolvere una definizione di tabella ed eseguire il riempimento retroattivo solo per un intervallo specifico:
Definire la tabella iniziale:
CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Aggiornare la query per aggiungere
uniq_users:CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Le righe precedenti alla finestra di 7 giorni contengono
NULLperuniq_users.
Esempio 4: Eseguire l'iterazione in una piccola finestra prima di riempire la cronologia completa
In questo esempio viene illustrato come convalidare la logica di query in una finestra di dati di piccole dimensioni prima di elaborare l'intervallo cronologico completo.
Iniziare con una breve finestra per convalidare le metriche e scorrere la logica di business con costi di calcolo inferiori:
CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Una finestra breve ricomputa solo gli ultimi 7 giorni per ogni aggiornamento, quindi rivedere la query il maggior numero di volte necessario prima di eseguire il commit a un'esecuzione cronologica completa.
Dopo aver finalizzato la query, usare DML per riempire l'intervallo cronologico completo:
INSERT INTO revenue_attribution
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;