Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questa pagina descrive i modelli consigliati per la progettazione, la costruzione e l'operatività delle pipeline con le pipeline dichiarative Lakeflow Spark. Applicare queste linee guida all'avvio di una nuova pipeline o al miglioramento di una pipeline esistente.
Scegliere il tipo di set di dati corretto
Le pipeline dichiarative di Lakeflow Spark offrono tre tipi di set di dati: tabelle di streaming, viste materializzate e viste temporanee. La scelta del tipo corretto per ogni livello della pipeline evita i costi di calcolo non necessari e mantiene il codice facile da ragionare.
Le tabelle di streaming sono la scelta ideale per l'inserimento dati e le trasformazioni di streaming a bassa latenza. Ogni riga di input viene letta ed elaborata una sola volta, il che le rende ideali per carichi di lavoro di sola aggiunta, dati ad alto volume ed elaborazione basata su eventi da archiviazione cloud o bus di messaggi.
Le viste materializzate sono la scelta giusta per trasformazioni complesse e query analitiche. I risultati vengono pre-calcolati e mantenuti aggiornati usando l'aggiornamento incrementale, quindi le query su di esse sono veloci. Non è possibile modificare direttamente i dati in una vista materializzata. La definizione della query controlla l'output.
Le viste temporanee sono viste con ambito pipeline che organizzano la logica di trasformazione senza materializzare i dati nell'archiviazione. Usarli per i passaggi intermedi che non richiedono la propria tabella.
La tabella seguente riepiloga quando usare ogni tipo:
| caso d'uso | Tipo consigliato | Ragione |
|---|---|---|
| Ingestione da archivio cloud o bus di messaggi | Tabella di streaming | Elabora ogni record una volta; gestisce volumi elevati e carichi di lavoro solo di accodamento. |
| Flussi CDC (inserimenti, aggiornamenti, eliminazioni) | Tabella di streaming | Usato come destinazione di APPLY CHANGES INTO per l'inserimento CDC ordinato e deduplicato. |
| Aggregazioni complesse e interconnessioni | Vista materializzata | Aggiornamento incrementale; evita la ricompilazione completa in ogni aggiornamento. |
| Accelerazione delle query della dashboard | Vista materializzata | I risultati pre-calcolati rendono le query più veloci rispetto alle tabelle non elaborate. |
| Trasformazioni intermedie (nessun lettore downstream) | Visualizzazione temporanea | Organizza la logica della pipeline senza incorrere in costi di archiviazione. |
Per altre informazioni, vedere Tabelle di streaming, viste materializzate e Concetti relativi alle pipeline dichiarative di Lakeflow Spark.
Usare CDC dichiarativo anziché MERGE imperativo
L'implementazione di Change Data Capture (CDC) con istruzioni SQL MERGE imperative richiede codice personalizzato significativo per gestire correttamente l'ordinamento degli eventi, la deduplicazione, gli aggiornamenti parziali e l'evoluzione dello schema. Ognuno di questi problemi deve essere risolto in modo indipendente e il codice risultante è difficile da gestire e testare.
Le pipeline dichiarative di Lakeflow Spark forniscono l'istruzione APPLY CHANGES INTO (SQL) e la funzione apply_changes() (Python), che gestiscono in modo dichiarativo l'ordinamento, la deduplicazione, gli eventi non ordinati e l'evoluzione dello schema. Describi la forma del feed di modifiche e della tabella di destinazione, e la pipeline gestisce il resto.
APPLY CHANGES INTO supporta sia il tipo scD 1 (sovrascrittura) che il tipo 2 (conservazione della cronologia).
Per altre informazioni, vedere Cattura dei dati delle modifiche e snapshot e Le API AUTO CDC: Semplificare la cattura dei dati delle modifiche con le pipeline.
Garantire la qualità dei dati secondo le aspettative
Le aspettative sono espressioni SQL true/false applicate a ogni riga che passa attraverso un set di dati. Quando una riga non soddisfa la condizione, la pipeline risponde in base ai criteri di violazione configurati. Le aspettative generano metriche nel registro eventi della pipeline indipendentemente dai criteri, in modo da poter tenere traccia delle tendenze della qualità dei dati nel tempo.
Scegliere un criterio di violazione
Sono disponibili tre criteri di violazione. Scegli quello che corrisponde alla tua tolleranza verso i dati non validi.
- warn (impostazione predefinita): i record non validi vengono scritti nella tabella di destinazione e contrassegnati nelle metriche. Usare questo criterio quando è necessario acquisire tutti i dati, ma si vuole ottenere visibilità sui problemi di qualità.
- drop: i record non validi vengono eliminati prima della scrittura. Usa questa opzione quando sono previste righe errate che non devono propagarsi ulteriormente.
- fail: l'aggiornamento della pipeline si arresta nel primo record non valido. Usare questa opzione per i dati critici in cui qualsiasi record non valido indica un grave problema upstream.
Gli esempi seguenti illustrano ogni criterio applicato a una tabella di streaming:
SQL
-- Warn: write invalid records but track them in metrics
CREATE OR REFRESH STREAMING TABLE orders_raw (
CONSTRAINT valid_order_id EXPECT (order_id IS NOT NULL)
) AS SELECT * FROM STREAM read_files("/volumes/raw/orders", format => "json");
-- Drop: discard invalid records before writing
CREATE OR REFRESH STREAMING TABLE orders_clean (
CONSTRAINT non_negative_amount EXPECT (amount >= 0) ON VIOLATION DROP ROW
) AS SELECT * FROM STREAM(orders_raw);
-- Fail: stop the pipeline on any invalid record
CREATE OR REFRESH STREAMING TABLE orders_critical (
CONSTRAINT required_customer_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE
) AS SELECT * FROM STREAM(orders_clean);
Python
from pyspark import pipelines as dp
# Warn: write invalid records but track them in metrics
@dp.table
@dp.expect("valid_order_id", "order_id IS NOT NULL")
def orders_raw():
return spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.load("/volumes/raw/orders")
# Drop: discard invalid records before writing
@dp.table
@dp.expect_or_drop("non_negative_amount", "amount >= 0")
def orders_clean():
return spark.readStream.table("orders_raw")
# Fail: stop the pipeline on any invalid record
@dp.table
@dp.expect_or_fail("required_customer_id", "customer_id IS NOT NULL")
def orders_critical():
return spark.readStream.table("orders_clean")
Mettere in quarantena record non validi
Quando si desidera conservare i record eliminati per l'analisi anziché eliminarli automaticamente, usare un modello di quarantena. Instradare le righe che non riescono a eseguire la convalida in una tabella di streaming separata usando due flussi: una che elimina righe non valide dalla tabella principale e una seconda che scrive solo le righe non valide in una tabella di quarantena. In questo modo è possibile analizzare, correggere e rielaborare i dati non corretti senza contaminare il set di dati pulito.
Per un esempio dettagliato del modello di quarantena, vedere Raccomandazioni sulle aspettative e modelli avanzati.
Per altre informazioni sulle aspettative, vedere Gestire la qualità dei dati con le aspettative della pipeline.
Parametrizza le tue pipeline
Le pipeline hanno impostazioni predefinite del catalogo e dello schema, quindi il codice che legge e scrive nello stesso catalogo e schema funziona in ambienti senza parametri. Tuttavia, se la pipeline deve fare riferimento a un secondo catalogo o schema, ad esempio quando si legge da un catalogo di origine condiviso che differisce tra l'ambiente di sviluppo e quello di produzione, evitare di codificare i nomi direttamente nel codice sorgente. Definirli invece come parametri di configurazione della pipeline (coppie chiave-valore impostate nelle impostazioni della pipeline) e farvi riferimento nel codice. In questo modo, una singola codebase viene eseguita correttamente tra gli ambienti scambiando i valori dei parametri.
SQL
CREATE OR REFRESH MATERIALIZED VIEW transaction_summary AS
SELECT account_id, COUNT(txn_id) AS txn_count, SUM(amount) AS total_amount
FROM ${source_catalog}.sales.transactions
GROUP BY account_id;
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import count, sum
@dp.materialized_view
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return spark.read.table(f"{source_catalog}.sales.transactions") \
.groupBy("account_id") \
.agg(
count("txn_id").alias("txn_count"),
sum("amount").alias("total_amount")
)
Per altre informazioni, vedere Usare i parametri con le pipeline.
Scegliere la modalità pipeline corretta per ogni ambiente
Modalità di aggiornamento per sviluppo e produzione
Le pipeline vengono eseguite in modalità di sviluppo o di aggiornamento di produzione . Scegliere la modalità corrispondente all'obiettivo.
In modalità di sviluppo, la pipeline riutilizza un cluster a esecuzione prolungata tra gli aggiornamenti e non riprova sugli errori. Questo accelera il ciclo di iterazione durante la creazione e il test del codice della pipeline perché si ottengono immediatamente i dettagli dell'errore senza attendere i riavvii del cluster.
In modalità di produzione, il cluster viene arrestato immediatamente dopo il completamento di ogni aggiornamento, riducendo così i costi di calcolo. La pipeline applica anche tentativi di ripetizione crescenti, inclusi i riavvii del cluster, per risolvere automaticamente gli errori transitori dell'infrastruttura. Usare la modalità di produzione per tutte le esecuzioni di pipeline pianificate.
Modalità di pipeline attivate e continue
La modalità attivata elabora tutti i dati disponibili e quindi si arresta. È la scelta giusta per la maggior parte delle pipeline: quelle eseguite in base a una pianificazione (oraria, giornaliera o su richiesta) e non richiedono una frequenza di aggiornamento dei dati inferiore al minuto.
La modalità continua mantiene il cluster in esecuzione ed elabora nuovi dati man mano che arriva. È appropriato solo quando il caso d'uso richiede la latenza nell'intervallo da secondi a minuti. Poiché la modalità continua richiede un cluster sempre attivo, è molto più costosa rispetto alla modalità attivata.
Per ulteriori informazioni, vedere Modalità pipeline: attivata vs. continua e Configura pipeline.
Usare il clustering liquido per il layout dei dati
Il clustering liquido sostituisce il partizionamento statico e ZORDER per ottimizzare il layout dei dati nelle tabelle Delta. A differenza del partizionamento, che richiede di scegliere una colonna di partizione in anticipo e può causare squilibrio dei dati quando i valori sono distribuiti in modo non uniforme, il liquid clustering si auto-ottimizza, è resistente agli squilibri ed è incrementale: vengono riscritti solo i dati che necessitano di riorganizzazione in ogni esecuzione.
Modificare le colonne di clustering in qualsiasi momento senza riscrivere la tabella completa man mano che i modelli di query si evolvono.
Definire le colonne di clustering nella definizione della tabella di streaming:
SQL
CREATE OR REFRESH STREAMING TABLE events
CLUSTER BY (event_date, region)
AS SELECT * FROM STREAM read_files("/volumes/raw/events", format => "parquet");
Python
from pyspark import pipelines as dp
@dp.table(cluster_by=["event_date", "region"])
def events():
return spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "parquet") \
.load("/volumes/raw/events")
Se non si è certi delle colonne da usare per cluster, usare CLUSTER BY AUTO per consentire a Databricks di selezionare automaticamente le colonne di clustering ottimali in base al carico di lavoro di query.
Per ulteriori informazioni, consultare Tabelle di streaming e Utilizzare clustering liquido per le tabelle.
Gestisci le pipeline con CI/CD e pacchetti di automazione dichiarativa
Controllare la versione del codice sorgente della pipeline e usare bundle di automazione dichiarativa per gestire le distribuzioni in ambienti diversi.
Per altre informazioni, vedere Creare una pipeline controllata dall'origine, Convertire una pipeline in un progetto di bundle e Usare i parametri con le pipeline.
Archiviare il codice della pipeline nel controllo della versione
Archiviare tutti i file di origine della pipeline (Python e SQL) insieme alla configurazione del bundle in un repository Git. Il controllo della versione del progetto completo offre una cronologia completa delle modifiche, semplifica la collaborazione e consente di convalidare le modifiche in un ambiente di sviluppo prima di promuoverle nell'ambiente di produzione.
Databricks consiglia pacchetti di automazione dichiarativi per la gestione di questo flusso di lavoro. Un bundle definisce la configurazione della pipeline in YAML insieme al codice sorgente e l'interfaccia della databricks bundle riga di comando consente di convalidare, distribuire ed eseguire pipeline dal terminale o da un sistema CI/CD.
Usa obiettivi bundle per l'isolamento dell'ambiente
I bundle consentono più destinazioni (ad esempio, dev, staging, ), prodognuna con il proprio set di override per i nomi di catalogo, i criteri del cluster, gli indirizzi di notifica e altre impostazioni. Combinare le destinazioni bundle con i parametri della pipeline per inserire i valori corretti specifici dell'ambiente in fase di distribuzione, mantenendo il codice sorgente privo di costanti di ambiente.
Un flusso di lavoro tipico è simile al seguente:
- Uno sviluppatore lavora su un feature branch, distribuendo in una pipeline di sviluppo personale in un dev catalog.
- Quando si effettua il merge nel ramo principale, un sistema CI esegue i processi
databricks bundle validateedatabricks bundle deploy --target stagingper convalidare e distribuire la pipeline in un ambiente di staging. - Al termine dei test, il sistema CI viene distribuito nell'ambiente di produzione con
databricks bundle deploy --target prod.
Procedure consigliate per lo streaming
Usare questi modelli per gestire lo stato, controllare i dati in ritardo e mantenere affidabili le pipeline di streaming.
Per altre informazioni, vedere Ottimizzare l'elaborazione stateful con marcatori di stato, Ripristinare una pipeline dall'errore del checkpoint di streaming e Riempimento retroattivo dei dati storici con le pipeline.
Usare filigrane per le operazioni con stato
Le filigrane vincolano lo stato che la pipeline mantiene in memoria durante le operazioni di streaming basate sullo stato, come ad esempio le aggregazioni basate su finestre e la deduplicazione. Senza un watermark, lo stato cresce incontrastato mentre la pipeline accumula dati per ogni possibile chiave, provocando alla fine errori di memoria nelle pipeline in esecuzione prolungata.
Una filigrana specifica una colonna timestamp e una soglia di tolleranza per i dati in ritardo. I record che arrivano dopo il superamento della soglia vengono eliminati. Scegliere una soglia che bilancia la tolleranza per i dati in ritardo rispetto al costo di memoria per mantenere lo stato aperto.
L'esempio seguente calcola un'aggregazione di finestra con intervallo fisso di un minuto con una marcatura temporale di tre minuti.
SQL
CREATE OR REFRESH STREAMING TABLE event_counts AS
SELECT window(event_time, '1 minute') AS time_window, region, COUNT(*) AS cnt
FROM STREAM(events_raw)
WATERMARK event_time DELAY OF INTERVAL 3 MINUTES
GROUP BY time_window, region;
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import window
@dp.table
def event_counts():
return (
spark.readStream.table("events_raw")
.withWatermark("event_time", "3 minutes")
.groupBy(window("event_time", "1 minute"), "region")
.count()
)
Annotazioni
Per assicurarsi che le aggregazioni vengano elaborate in modo incrementale anziché completamente ricalcolate in ogni aggiornamento, è necessario definire una filigrana.
Informazioni sullo stato di streaming e sull'aggiornamento completo
Lo stato di streaming è incrementale: la pipeline compila e mantiene lo stato tra gli aggiornamenti anziché ricompilare da zero ogni volta. Questo è ciò che rende efficiente lo streaming con stato, ma significa anche che se si modifica la logica di una query con stato (ad esempio, modificando una soglia limite o modificando le colonne di aggregazione), lo stato esistente non è più compatibile con la nuova logica. In questo caso, è necessario eseguire un aggiornamento completo per rielaborare tutti i dati cronologici con la nuova logica e ricompilare lo stato da zero.
Un aggiornamento completo può anche causare la perdita di dati se l'origine non conserva i dati cronologici. Ad esempio, un'origine Kafka con un breve periodo di conservazione può avere solo gli ultimi minuti di dati disponibili al momento dell'aggiornamento, generando una tabella che contiene dati molto meno di prima. Pianificare con attenzione le modifiche alla logica di interrogazione stateful, soprattutto per i flussi con volumi elevati per cui un aggiornamento completo è dispendioso o in cui la sorgente ha un tempo di conservazione dei dati limitato. L'utilizzo dell'architettura medallion aiuta a creare tabelle di bronzo con trasformazioni minime e permette di ricalcolare le tabelle di argento o oro dalle tabelle di bronzo, mantenendo l'intera cronologia.
Join tra stream e stream
I join di flusso richiedono una filigrana su entrambi i lati del join e una condizione di join con limiti di tempo. L'intervallo di tempo nella condizione di join indica al motore di streaming quando non sono possibili ulteriori corrispondenze, consentendogli di rimuovere lo stato che non può più essere confrontato. Se si omettono le filigrane o la condizione associata al tempo, lo stato cresce senza limiti.
L'esempio seguente unisce gli eventi di impression degli annunci con gli eventi click, richiedendo che il clic si verifichi entro tre minuti dall'impressione:
SQL
CREATE OR REFRESH STREAMING TABLE impression_clicks AS
SELECT imp.ad_id, imp.impression_time, clk.click_time
FROM STREAM(ad_impressions)
WATERMARK impression_time DELAY OF INTERVAL 3 MINUTES AS imp
JOIN STREAM(user_clicks)
WATERMARK click_time DELAY OF INTERVAL 3 MINUTES AS clk
ON imp.ad_id = clk.ad_id
AND clk.click_time BETWEEN imp.impression_time
AND imp.impression_time + INTERVAL 3 MINUTES;
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import expr
dp.create_streaming_table("impression_clicks")
@dp.append_flow(target="impression_clicks")
def join_impressions_and_clicks():
impressions = spark.readStream.table("ad_impressions") \
.withWatermark("impression_time", "3 minutes")
clicks = spark.readStream.table("user_clicks") \
.withWatermark("click_time", "3 minutes")
return impressions.alias("imp").join(
clicks.alias("clk"),
expr("""
imp.ad_id = clk.ad_id AND
clk.click_time BETWEEN imp.impression_time AND imp.impression_time + INTERVAL 3 MINUTES
"""),
"leftOuter"
)
Quando si aggiunge un flusso a una tabella statica (un join snapshot), lo snapshot della tabella statica viene aggiornato all'inizio di ogni microbatch. Ciò significa che i record delle dimensioni arrivati in ritardo non vengono applicati retroattivamente ai fatti già elaborati. Se è necessaria un'applicazione retroattiva, usare una vista materializzata o ristrutturare la pipeline.
Ottimizzare le prestazioni della pipeline
Applicare queste tecniche per ridurre i costi di calcolo e velocizzare gli aggiornamenti delle pipeline.
Per altre informazioni, vedere Viste materializzate e Ottimizzare l'elaborazione con stato con filigrane.
Evitare file di piccole dimensioni
L'attivazione di una pipeline troppo frequentemente in un'origine con volume ridotto scrive un numero elevato di file di piccole dimensioni nell'archiviazione cloud. I file di piccole dimensioni riducono le prestazioni di lettura perché ogni file richiede una ricerca di metadati e un round trip di I/O separati e le API di archiviazione cloud limitano le operazioni di presentazione su larga scala. Per evitare questo problema, scegliere un intervallo di trigger corrispondente al volume di dati: eseguire pipeline attivate in base a una pianificazione che consente di accumulare una quantità significativa di dati tra gli aggiornamenti, anziché continuamente.
Gestire il sbilanciamento dei dati
L'asimmetria dei dati si verifica quando i valori di una chiave join o groupBy vengono distribuiti in modo non uniforme tra le partizioni, causando un numero ridotto di attività per elaborare la maggior parte dei dati. Ciò genera hotspot che aumentano il tempo di aggiornamento end-to-end. Usare il clustering liquido per risolvere l'asimmetria nelle tabelle archiviate. Per l'asimmetria che si verifica durante il calcolo in anteprima, aggiungere un suffisso di bucket casuale prima di raggruppare e aggregare in due fasi le chiavi altamente asimmetriche.
Per altre informazioni, vedere Usare il clustering liquido per il layout dei dati.
Usare l'aggiornamento incrementale per le viste materializzate
Quando si usa una vista materializzata per un'aggregazione di grandi dimensioni, Le pipeline dichiarative di Lakeflow Spark tentano di aggiornarla in modo incrementale, elaborando solo le modifiche upstream dall'ultimo aggiornamento anziché ricompilare il set di risultati completo. L'aggiornamento incrementale è notevolmente più economico rispetto alla riesecuzione della query dall'inizio ad ogni attivazione della pipeline. Per massimizzare la probabilità che una vista materializzata possa essere aggiornata in modo incrementale, scrivere query di aggregazione semplici e deterministiche ed evitare costrutti che impediscono l'elaborazione incrementale, ad esempio funzioni non deterministiche.
Vedere aggiornamento incrementale per le viste materializzate.
Ottimizza i join
Per i join in cui un lato è una piccola tabella dimensionale, aggiungere un suggerimento di broadcast per istruire Spark a trasmettere la tabella più piccola a tutti gli executor anziché eseguire un join con shuffle.
SQL
CREATE OR REFRESH MATERIALIZED VIEW enriched_orders AS
SELECT o.*, /*+ BROADCAST(p) */ p.product_name, p.category
FROM orders o
JOIN products p ON o.product_id = p.product_id;
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast
@dp.materialized_view
def enriched_orders():
orders = spark.read.table("orders")
products = spark.read.table("products")
return orders.join(broadcast(products), "product_id")
Per i join di prossimità su serie temporali, ad esempio per trovare l'evento più vicino all'interno di un intervallo di tempo, usare una condizione di join di intervallo e assicurarsi che entrambi i lati abbiano un watermark se si uniscono flussi, o considerare di pre-raggruppare gli eventi in contenitori temporali prima dell'aggiunta.
Monitorare le pipeline
Il log eventi della pipeline è la primitiva di osservabilità primaria nelle pipeline dichiarative di Lakeflow Spark. Ogni esecuzione della pipeline scrive record strutturati nel registro eventi che copre lo stato di esecuzione, i risultati delle aspettative sulla qualità dei dati, la derivazione dei dati e i dettagli degli errori. Il registro eventi è una tabella Delta che è possibile interrogare direttamente.
Per eseguire una query sul registro eventi senza conoscere il percorso di archiviazione sottostante, usare la event_log() funzione con valori di tabella in un cluster condiviso o in un sql warehouse:
SELECT * FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC
LIMIT 100;
Creare dashboard di qualità dei dati eseguendo una query sul registro eventi per le metriche delle aspettative. La details colonna contiene una struttura JSON annidata con conteggi di pass/fail per ogni vincolo, che è possibile usare per tenere traccia delle tendenze di qualità nel tempo e avvisare le regressioni.
Per gli avvisi basati su eventi, usare hook di eventi per attivare webhook personalizzati o servizi di notifica (ad esempio Slack o PagerDuty) quando una pipeline ha esito negativo o quando viene superata una soglia di qualità dei dati. Gli hook di eventi sono funzioni Python eseguite in risposta agli eventi della pipeline.
Per altre informazioni, vedere Monitorare le pipeline, il registro eventi della pipeline e Definire il monitoraggio personalizzato delle pipeline con hook di eventi.
Usare l'ambiente di calcolo serverless
Databricks consiglia il calcolo serverless per le nuove pipeline. Con serverless, non è disponibile alcuna configurazione manuale del cluster: Databricks gestisce automaticamente l'infrastruttura. Le pipeline serverless utilizzano la scalabilità automatica avanzata che consente di ridimensionare orizzontalmente (più executor) e verticalmente (dimensioni maggiori dell'executor) in risposta alle richieste del carico di lavoro. Per impostazione predefinita, le pipeline serverless usano sempre il catalogo Unity, quindi la governance e il rilevamento della derivazione sono incorporati.
Per altre informazioni, vedere Configurare una pipeline serverless.
Organizzare le pipeline con l'architettura medallion
L'architettura medallion organizza i dati in tre livelli logici, ovvero bronzo, argento e oro, ognuno con uno scopo distinto. La mappatura dei tipi di dataset dichiarativi di Lakeflow Spark al livello appropriato mantiene chiare le responsabilità di ciascun livello e facilita la manutenzione delle pipeline.
- Bronze: usare le tabelle di streaming per inserire dati non elaborati da risorse di archiviazione cloud, bus di messaggi o origini CDC. Le tabelle bronze mantengono i dati di origine non elaborati con una trasformazione minima, consentendo ai livelli silver o gold di rielaborare dall'origine nel livello bronzo se i requisiti cambiano.
- Silver: usare le tabelle di streaming per trasformazioni incrementali a livello di riga (filtro, pulizia e analisi). Usare viste materializzate quando la logica silver-layer comporta join di arricchimento su tabelle delle dimensioni o aggregazioni complesse che traggono vantaggio dall'aggiornamento incrementale.
- Gold: Usare viste materializzate per pre-calcolare aggregazioni, metriche e riepiloghi forniti a dashboard, strumenti di reporting e consumer downstream.
Separare l'inserimento (bronzo) e la trasformazione (argento e oro) in distinti pipeline DAG, quando possibile. Il disaccoppiamento dei livelli consente di pianificare, monitorare e risolvere i problemi di ciascun livello separatamente e un errore in una pipeline di trasformazione non impedisce l'inserimento di nuovi dati nel livello bronzo.
Per altre informazioni, vedere Tabelle di streaming e viste materializzate.