Condividi tramite


Procedure consigliate per Delta Lake

Questo articolo descrive le procedure consigliate per l'uso di Delta Lake.

Databricks consiglia di usare l'ottimizzazione predittiva. Vedere Ottimizzazione predittiva per Delta Lake.

Quando si elimina e ricrea una tabella nella stessa posizione, è consigliabile usare sempre un'istruzione CREATE OR REPLACE TABLE . Vedere Eliminare o sostituire una tabella Delta.

Rimuovere le configurazioni Delta legacy

Databricks consiglia di rimuovere le configurazioni Delta legacy più esplicite dalle configurazioni spark e dalle proprietà della tabella durante l'aggiornamento a una nuova versione di Databricks Runtime. Le configurazioni legacy possono impedire l'applicazione di nuove ottimizzazioni e valori predefiniti introdotti da Databricks ai carichi di lavoro migrati.

Usare il clustering liquido per ignorare i dati ottimizzati

Databricks consiglia di usare il clustering liquido anziché il partizionamento, l'ordine Z o altre strategie dell'organizzazione dei dati per ottimizzare il layout dei dati per ignorare i dati. Vedere Usare il clustering liquido per le tabelle Delta.

File compattati

L'ottimizzazione predittiva esegue OPTIMIZE automaticamente i comandi e VACUUM le tabelle gestite di Unity Catalog. Vedere Ottimizzazione predittiva per Delta Lake.

Databricks consiglia di eseguire di frequente il comando OPTIMIZE per compattare file di piccole dimensioni.

Nota

Questa operazione non rimuove i file precedenti. Per rimuoverli, eseguire il comando VACUUM .

Sostituire il contenuto o lo schema di una tabella

In alcuni casi può essere necessario sostituire una tabella Delta. Ad esempio:

  • I dati nella tabella non sono corretti e si desidera sostituire il contenuto.
  • Si vuole riscrivere l'intera tabella per apportare modifiche dello schema incompatibili, ad esempio la modifica dei tipi di colonna.

Anche se è possibile eliminare l'intera directory di una tabella Delta e creare una nuova tabella nello stesso percorso, non è consigliabile perché:

  • L'eliminazione di una directory non è efficiente. Una directory contenente file di grandi dimensioni può richiedere ore o persino giorni per l'eliminazione.
  • Si perde tutto il contenuto nei file eliminati; è difficile recuperare se si elimina la tabella errata.
  • L'eliminazione della directory non è atomica. Durante l'eliminazione della tabella una query simultanea che legge la tabella può non riuscire o visualizzare una tabella parziale.

Se non è necessario modificare lo schema della tabella, è possibile eliminare i dati da una tabella Delta e inserire i nuovi dati oppure aggiornare la tabella per correggere i valori non corretti.

Se si desidera modificare lo schema della tabella, è possibile sostituire l'intera tabella in modo atomico. Ad esempio:

Python

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("<your-table>") # Managed table

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .option("path", "<your-table-path>") \
  .saveAsTable("<your-table>") # External table

SQL

REPLACE TABLE <your-table> USING DELTA AS SELECT ... -- Managed table
REPLACE TABLE <your-table> USING DELTA LOCATION "<your-table-path>" AS SELECT ... -- External table

Scala

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable("<your-table>") // Managed table

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .option("path", "<your-table-path>")
  .saveAsTable("<your-table>") // External table

Questo approccio offre diversi vantaggi:

  • La sovrascrittura di una tabella è molto più veloce perché non è necessario elencare la directory in modo ricorsivo o eliminare file.
  • La versione precedente della tabella esiste ancora. Se si elimina la tabella errata, è possibile recuperare facilmente i vecchi dati usando il tempo di spostamento. Vedere Usare la cronologia delle tabelle Delta Lake.
  • È un'operazione atomica. Le query simultanee possono comunque leggere la tabella durante l'eliminazione della tabella.
  • A causa delle garanzie di transazione Delta Lake ACID, se la sovrascrittura della tabella ha esito negativo, la tabella sarà nello stato precedente.

Inoltre, se si desidera eliminare i file obsoleti per risparmiare sui costi di archiviazione dopo la sovrascrittura della tabella, è possibile usare VACUUM per eliminarli. È ottimizzato per l'eliminazione di file ed è in genere più veloce rispetto all'eliminazione dell'intera directory.

Memorizzazione nella cache di Spark

Databricks non consiglia di usare la memorizzazione nella cache di Spark per i motivi seguenti:

  • Si perdono tutti i dati ignorati che possono provenire da filtri aggiuntivi aggiunti sopra all'oggetto memorizzato DataFramenella cache.
  • I dati memorizzati nella cache potrebbero non essere aggiornati se si accede alla tabella usando un identificatore diverso.

Differenze tra Delta Lake e Parquet in Apache Spark

Delta Lake gestisce automaticamente le operazioni seguenti. È consigliabile non eseguire mai queste operazioni manualmente:

  • REFRESH TABLE: le tabelle Delta restituiscono sempre le informazioni più aggiornate, quindi non è necessario chiamare REFRESH TABLE manualmente dopo le modifiche.
  • Aggiungere e rimuovere partizioni: Delta Lake tiene traccia automaticamente del set di partizioni presenti in una tabella e aggiorna l'elenco man mano che i dati vengono aggiunti o rimossi. Di conseguenza, non è necessario eseguire ALTER TABLE [ADD|DROP] PARTITION o MSCK.
  • Caricare una singola partizione: la lettura diretta delle partizioni non è necessaria. Ad esempio, non è necessario eseguire spark.read.format("parquet").load("/data/date=2017-01-01"). Usare invece una WHERE clausola per ignorare i dati, ad esempio spark.read.table("<table-name>").where("date = '2017-01-01'").
  • Non modificare manualmente i file di dati: Delta Lake usa il log delle transazioni per eseguire il commit delle modifiche apportate alla tabella in modo atomico. Non modificare, aggiungere o eliminare direttamente file di dati Parquet in una tabella Delta, perché ciò può causare la perdita di dati o il danneggiamento della tabella.

Migliorare le prestazioni per l'unione delta Lake

È possibile ridurre il tempo necessario per l'unione usando gli approcci seguenti:

  • Ridurre lo spazio di ricerca per le corrispondenze: per impostazione predefinita, l'operazione merge cerca nell'intera tabella Delta le corrispondenze nella tabella di origine. Un modo per velocizzare merge è ridurre lo spazio di ricerca aggiungendo vincoli noti nella condizione di corrispondenza. Si supponga, ad esempio, di avere una tabella partizionata da country e date che si vuole usare merge per aggiornare le informazioni relative all'ultimo giorno e a un paese specifico. L'aggiunta della condizione seguente rende la query più veloce, perché cerca le corrispondenze solo nelle partizioni pertinenti:

    events.date = current_date() AND events.country = 'USA'
    

    Inoltre, questa query riduce anche le probabilità di conflitti con altre operazioni simultanee. Per altri dettagli, vedere Livelli di isolamento e conflitti di scrittura in Azure Databricks .

  • File compattati: se i dati vengono archiviati in molti file di piccole dimensioni, la lettura dei dati per la ricerca di corrispondenze può diventare lenta. È possibile compattare file di piccole dimensioni in file di dimensioni maggiori per migliorare la velocità effettiva di lettura. Per informazioni dettagliate, vedere Ottimizzare il layout dei file di dati.

  • Controllare le partizioni casuali per le scritture: l'operazione merge sposta più volte i dati nel calcolo e scrive i dati aggiornati. Il numero di attività usate per la riproduzione casuale è controllato dalla configurazione spark.sql.shuffle.partitionsdella sessione Spark. L'impostazione di questo parametro non solo controlla il parallelismo, ma determina anche il numero di file di output. L'aumento del valore aumenta il parallelismo, ma genera anche un numero maggiore di file di dati più piccoli.

  • Abilitare le scritture ottimizzate: per le tabelle partizionate, merge può produrre un numero molto maggiore di file di piccole dimensioni rispetto al numero di partizioni casuali. Ciò è dovuto al fatto che ogni attività casuale può scrivere più file in più partizioni e può diventare un collo di bottiglia delle prestazioni. È possibile ridurre il numero di file abilitando le scritture ottimizzate. Vedere Scritture ottimizzate per Delta Lake in Azure Databricks.

  • Ottimizzare le dimensioni dei file nella tabella: Azure Databricks può rilevare automaticamente se una tabella Delta ha operazioni frequenti merge che riscrivono i file e può scegliere di ridurre le dimensioni dei file riscritti in previsione di ulteriori riscritture dei file in futuro. Per informazioni dettagliate, vedere la sezione sull'ottimizzazione delle dimensioni dei file.

  • Unione casuale bassa: l'unione casuale bassa offre un'implementazione ottimizzata di MERGE che offre prestazioni migliori per i carichi di lavoro più comuni. Inoltre, mantiene le ottimizzazioni esistenti del layout dei dati, ad esempio l'ordinamento Z sui dati non modificati.

Gestire la recency dei dati

All'inizio di ogni query, le tabelle Delta vengono aggiornate automaticamente alla versione più recente della tabella. Questo processo può essere osservato nei notebook quando lo stato del comando segnala: Updating the Delta table's state. Tuttavia, quando si esegue l'analisi cronologica in una tabella, potrebbero non essere necessari necessariamente dati aggiornati all'ultimo minuto, soprattutto per le tabelle in cui i dati di streaming vengono inseriti di frequente. In questi casi, le query possono essere eseguite su snapshot non aggiornati della tabella Delta. Questo approccio può ridurre la latenza per ottenere risultati dalle query.

È possibile configurare la tolleranza per i dati non aggiornati impostando la configurazione spark.databricks.delta.stalenessLimit della sessione Spark con un valore stringa di tempo, 1h ad esempio o 15m (rispettivamente per 1 ora o 15 minuti). Questa configurazione è specifica della sessione e non influisce sugli altri client che accedono alla tabella. Se lo stato della tabella è stato aggiornato entro il limite di decadimento, una query sulla tabella restituisce i risultati senza attendere l'aggiornamento della tabella più recente. Questa impostazione non impedisce mai l'aggiornamento della tabella e, quando vengono restituiti dati non aggiornati, i processi di aggiornamento in background. Se l'ultimo aggiornamento della tabella è precedente al limite di decadimento, la query non restituisce risultati fino al completamento dell'aggiornamento dello stato della tabella.

Checkpoint avanzati per query a bassa latenza

Delta Lake scrive i checkpoint come stato aggregato di una tabella Delta a una frequenza ottimizzata. Questi checkpoint fungono da punto di partenza per calcolare lo stato più recente della tabella. Senza checkpoint, Delta Lake dovrà leggere una raccolta di file JSON di grandi dimensioni ("file delta") che rappresenta i commit nel log delle transazioni per calcolare lo stato di una tabella. Inoltre, le statistiche a livello di colonna usate da Delta Lake per eseguire lo skipping dei dati vengono archiviate nel checkpoint.

Importante

I checkpoint Delta Lake sono diversi dai checkpoint di Structured Streaming.

Le statistiche a livello di colonna vengono archiviate come struct e json (per compatibilità con le versioni precedenti). Il formato dello struct rende le letture Delta Lake molto più veloci, perché:

  • Delta Lake non esegue l'analisi JSON costosa per ottenere statistiche a livello di colonna.
  • Le funzionalità di eliminazione delle colonne Parquet riducono significativamente le operazioni di I/O necessarie per leggere le statistiche per una colonna.

Il formato dello struct consente una raccolta di ottimizzazioni che riducono il sovraccarico delle operazioni di lettura Delta Lake da secondi a decine di millisecondi, riducendo significativamente la latenza per le query brevi.

Gestire le statistiche a livello di colonna nei checkpoint

È possibile gestire la modalità di scrittura delle statistiche nei checkpoint usando le proprietà delta.checkpoint.writeStatsAsJson della tabella e delta.checkpoint.writeStatsAsStruct. Se entrambe le proprietà della tabella sono false, Delta Lake non può eseguire l'ignoramento dei dati.

  • Batch scrive statistiche di scrittura sia in formato JSON che in formato struct. delta.checkpoint.writeStatsAsJson è .true
  • delta.checkpoint.writeStatsAsStruct è non definito per impostazione predefinita.
  • I lettori usano la colonna struct quando disponibile e in caso contrario eseguire il fallback all'uso della colonna JSON.

Importante

I checkpoint avanzati non interrompono la compatibilità con i lettori Delta Lake open source. Tuttavia, l'impostazione delta.checkpoint.writeStatsAsJson di su false può avere implicazioni sui lettori Delta Lake proprietari. Per altre informazioni sulle implicazioni sulle prestazioni, contattare i fornitori.

Abilitare checkpoint avanzati per le query structured streaming

Se i carichi di lavoro Structured Streaming non hanno requisiti di bassa latenza (latenze secondarie), è possibile abilitare checkpoint avanzati eseguendo il comando SQL seguente:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

È anche possibile migliorare la latenza di scrittura del checkpoint impostando le proprietà della tabella seguenti:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

Se i dati ignorati non sono utili nell'applicazione, è possibile impostare entrambe le proprietà su false. Non vengono quindi raccolte o scritte statistiche. Databricks non consiglia questa configurazione.