Usare la cronologia delle tabelle Delta Lake

Ogni operazione che modifica una tabella Delta Lake crea una nuova versione della tabella. È possibile usare le informazioni sulla cronologia per controllare le operazioni, eseguire il rollback di una tabella o eseguire query su una tabella in un momento specifico usando il tempo di spostamento.

Nota

Databricks non consiglia di usare la cronologia delle tabelle Delta Lake come soluzione di backup a lungo termine per l'archiviazione dei dati. Databricks consiglia di usare solo gli ultimi 7 giorni per le operazioni di spostamento cronologico, a meno che non siano state impostate entrambe le configurazioni di conservazione dei dati e dei log su un valore maggiore.

Recuperare la cronologia delle tabelle Delta

È possibile recuperare informazioni, incluse le operazioni, l'utente e il timestamp per ogni scrittura in una tabella Delta eseguendo il history comando . Le operazioni vengono restituite in ordine cronologico inverso.

La conservazione della cronologia tabelle è determinata dall'impostazione delta.logRetentionDurationdella tabella , ovvero 30 giorni per impostazione predefinita.

Nota

Lo spostamento cronologico e la cronologia delle tabelle sono controllati da soglie di conservazione diverse. Vedere Che cos'è lo spostamento cronologico di Delta Lake?.

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

Per informazioni dettagliate sulla sintassi di Spark SQL, vedere DESCRIBE HISTORY.For Spark SQL syntax details, see DESCRIBE HISTORY.

Per informazioni dettagliate sulla sintassi scala/Java/Python, vedere la documentazione dell'API Delta Lake.

Esplora cataloghi offre una visualizzazione visiva di queste informazioni dettagliate sulla tabella e sulla cronologia per le tabelle Delta. Oltre allo schema della tabella e ai dati di esempio, è possibile fare clic sulla scheda Cronologia per visualizzare la cronologia della tabella visualizzata con DESCRIBE HISTORY.

Schema cronologia

L'output dell'operazione history include le colonne seguenti.

Column Type Description
versione long Versione della tabella generata dall'operazione.
timestamp timestamp Quando è stato eseguito il commit di questa versione.
userId string ID dell'utente che ha eseguito l'operazione.
userName string Nome dell'utente che ha eseguito l'operazione.
operation (operazione) string Nome dell'operazione.
operationParameters map Parametri dell'operazione , ad esempio predicati.
processo struct Dettagli del processo che ha eseguito l'operazione.
notebook struct Dettagli del notebook da cui è stata eseguita l'operazione.
clusterId string ID del cluster in cui è stata eseguita l'operazione.
readVersion long Versione della tabella letta per eseguire l'operazione di scrittura.
isolationLevel string Livello di isolamento usato per questa operazione.
isBlindAppend boolean Indica se questa operazione ha accodato dati.
operationMetrics map Metriche dell'operazione (ad esempio, numero di righe e file modificati).
userMetadata string Metadati di commit definiti dall'utente se è stato specificato
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Nota

Chiavi delle metriche delle operazioni

L'operazione history restituisce una raccolta di metriche operative nella mappa delle operationMetrics colonne.

Le tabelle seguenti elencano le definizioni delle chiavi della mappa in base all'operazione.

Operazione Nome metrica Descrizione
WRITE, CREATE TABLE AS edizione Standard LECT, REPLACE TABLE AS edizione Standard LECT, COPY INTO
numFiles Numero di file scritti.
numOutputBytes Dimensioni in byte del contenuto scritto.
numOutputRows Numero di righe scritte.
STREAMING UPDATE
numAddedFiles Numero di file aggiunti.
numRemovedFiles Numero di file rimossi.
numOutputRows Numero di righe scritte.
numOutputBytes Dimensione della scrittura in byte.
DELETE
numAddedFiles Numero di file aggiunti. Non specificato quando vengono eliminate le partizioni della tabella.
numRemovedFiles Numero di file rimossi.
numDeletedRows Numero di righe rimosse. Non specificato quando vengono eliminate le partizioni della tabella.
numCopiedRows Numero di righe copiate nel processo di eliminazione dei file.
executionTimeMs Tempo impiegato per eseguire l'intera operazione.
scanTimeMs Tempo impiegato per analizzare i file per individuare le corrispondenze.
rewriteTimeMs Tempo impiegato per riscrivere i file corrispondenti.
TRUNCATE
numRemovedFiles Numero di file rimossi.
executionTimeMs Tempo impiegato per eseguire l'intera operazione.
MERGE
numSourceRows Numero di righe nel dataframe di origine.
numTargetRowsInserted Numero di righe inserite nella tabella di destinazione.
numTargetRowsUpdated Numero di righe aggiornate nella tabella di destinazione.
numTargetRowsDeleted Numero di righe eliminate nella tabella di destinazione.
numTargetRowsCopied Numero di righe di destinazione copiate.
numOutputRows Numero totale di righe scritte.
numTargetFilesAdded Numero di file aggiunti al sink(destinazione).
numTargetFilesRemoved Numero di file rimossi dal sink(destinazione).
executionTimeMs Tempo impiegato per eseguire l'intera operazione.
scanTimeMs Tempo impiegato per analizzare i file per individuare le corrispondenze.
rewriteTimeMs Tempo impiegato per riscrivere i file corrispondenti.
UPDATE
numAddedFiles Numero di file aggiunti.
numRemovedFiles Numero di file rimossi.
numUpdatedRows Numero di righe aggiornate.
numCopiedRows Numero di righe appena copiate nel processo di aggiornamento dei file.
executionTimeMs Tempo impiegato per eseguire l'intera operazione.
scanTimeMs Tempo impiegato per analizzare i file per individuare le corrispondenze.
rewriteTimeMs Tempo impiegato per riscrivere i file corrispondenti.
FSCK numRemovedFiles Numero di file rimossi.
CONVERT numConvertedFiles Numero di file Parquet convertiti.
OPTIMIZE
numAddedFiles Numero di file aggiunti.
numRemovedFiles Numero di file ottimizzati.
numAddedBytes Numero di byte aggiunti dopo l'ottimizzazione della tabella.
numRemovedBytes Numero di byte rimossi.
minFileSize Dimensioni del file più piccolo dopo l'ottimizzazione della tabella.
p25FileSize Dimensioni del 25° file percentile dopo l'ottimizzazione della tabella.
p50FileSize Dimensioni del file mediano dopo l'ottimizzazione della tabella.
p75FileSize Dimensioni del 75° file percentile dopo l'ottimizzazione della tabella.
Maxfilesize Dimensioni del file più grande dopo l'ottimizzazione della tabella.
CLONE
sourceTableSize Dimensioni in byte della tabella di origine nella versione clonata.
sourceNumOfFiles Numero di file nella tabella di origine nella versione clonata.
numRemovedFiles Numero di file rimossi dalla tabella di destinazione se è stata sostituita una tabella Delta precedente.
removedFilesSize Dimensioni totali in byte dei file rimossi dalla tabella di destinazione se è stata sostituita una tabella Delta precedente.
numCopiedFiles Numero di file copiati nel nuovo percorso. 0 per cloni superficiali.
copiedFilesSize Dimensioni totali in byte dei file copiati nel nuovo percorso. 0 per cloni superficiali.
RESTORE
tableSizeAfterRestore Dimensioni della tabella in byte dopo il ripristino.
numOfFilesAfterRestore Numero di file nella tabella dopo il ripristino.
numRemovedFiles Numero di file rimossi dall'operazione di ripristino.
numRestoredFiles Numero di file aggiunti in seguito al ripristino.
removedFilesSize Dimensioni in byte dei file rimossi dal ripristino.
restoredFilesSize Dimensioni in byte di file aggiunti dal ripristino.
VACUUM
numDeletedFiles Numero di file eliminati.
numVacuumedDirectories Numero di directory a vuoto.
numFilesToDelete Numero di file da eliminare.

Che cos'è il viaggio nel tempo di Delta Lake?

Delta Lake Time Travel supporta l'esecuzione di query sulle versioni precedenti della tabella in base al timestamp o alla versione della tabella (come registrato nel log delle transazioni). È possibile usare il tempo di viaggio per le applicazioni, ad esempio le seguenti:

  • Ricreare analisi, report o output (ad esempio, l'output di un modello di Machine Learning). Questo può essere utile per il debug o il controllo, in particolare nei settori regolamentati.
  • Scrivere query temporali complesse.
  • Correggere gli errori nei dati.
  • Fornire l'isolamento dello snapshot per un set di query per tabelle che cambiano rapidamente.

Importante

Le versioni delle tabelle accessibili con il tempo di spostamento sono determinate da una combinazione della soglia di conservazione per i file di log delle transazioni e la frequenza e la conservazione specificata per VACUUM le operazioni. Se si esegue VACUUM ogni giorno con i valori predefiniti, sono disponibili 7 giorni di dati per il viaggio nel tempo.

Sintassi dello spostamento cronologico di Delta Lake

Per eseguire una query su una tabella Delta, aggiungere una clausola dopo la specifica del nome della tabella.

  • timestamp_expression può essere uno qualsiasi di:
    • '2018-10-18T22:15:12.013Z'ovvero una stringa di cui è possibile eseguire il cast a un timestamp
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', ovvero una stringa di data
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Qualsiasi altra espressione che è o può essere eseguita il cast a un timestamp
  • version è un valore long che può essere ottenuto dall'output di DESCRIBE HISTORY table_spec.

timestamp_expressionversion possono essere sottoquery.

Vengono accettate solo stringhe di data o timestamp. Ad esempio, "2019-01-01" e "2019-01-01T00:00:00.000Z". Vedere il codice seguente per una sintassi di esempio:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).load("/tmp/delta/people10m")

È anche possibile usare la @ sintassi per specificare il timestamp o la versione come parte del nome della tabella. Il timestamp deve essere in yyyyMMddHHmmssSSS formato . È possibile specificare una versione dopo @ anteponendo una v alla versione. Vedere il codice seguente per una sintassi di esempio:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

spark.read.load("/tmp/delta/people10m@20190101000000000")
spark.read.load("/tmp/delta/people10m@v123")

Che cosa sono i checkpoint del log delle transazioni?

Delta Lake registra le versioni della tabella come file JSON all'interno della directory, archiviate insieme ai dati della _delta_log tabella. Per ottimizzare l'esecuzione di query sui checkpoint, Delta Lake aggrega le versioni delle tabelle ai file di checkpoint Parquet, impedendo la necessità di leggere tutte le versioni JSON della cronologia delle tabelle. Azure Databricks ottimizza la frequenza di checkpoint per le dimensioni dei dati e il carico di lavoro. Gli utenti non devono interagire direttamente con i checkpoint. La frequenza del checkpoint è soggetta a modifiche senza preavviso.

Configurare la conservazione dei dati per le query di spostamento del tempo

Per eseguire query su una versione precedente della tabella, è necessario conservare sia il log che i file di dati per tale versione.

I file di dati vengono eliminati quando VACUUM vengono eseguiti su una tabella. Delta Lake gestisce automaticamente la rimozione dei file di log dopo il checkpoint delle versioni delle tabelle.

Poiché la maggior parte delle tabelle Delta è stata VACUUM eseguita regolarmente, le query temporizzato devono rispettare la soglia di conservazione per , ovvero 7 giorni per VACUUMimpostazione predefinita.

Per aumentare la soglia di conservazione dei dati per le tabelle Delta, è necessario configurare le proprietà della tabella seguenti:

  • delta.logRetentionDuration = "interval <interval>": controlla per quanto tempo viene mantenuta la cronologia di una tabella. Il valore predefinito è interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": determina l'utilizzo della soglia VACUUM per rimuovere i file di dati a cui non si fa più riferimento nella versione della tabella corrente. Il valore predefinito è interval 7 days.

È possibile specificare le proprietà Delta durante la creazione della tabella o impostarle con un'istruzione ALTER TABLE . Vedere Informazioni di riferimento sulle proprietà della tabella Delta.

Nota

È necessario impostare entrambe queste proprietà per garantire che la cronologia delle tabelle venga mantenuta per una durata più lunga per le tabelle con operazioni frequenti VACUUM . Ad esempio, per accedere a 30 giorni di dati cronologici, impostare delta.deletedFileRetentionDuration = "interval 30 days" (che corrisponde all'impostazione predefinita per delta.logRetentionDuration).

L'aumento della soglia di conservazione dei dati può causare l'aumento dei costi di archiviazione, man mano che vengono mantenuti più file di dati.

Ripristinare uno stato precedente di una tabella Delta

È possibile ripristinare uno stato precedente di una tabella Delta usando il RESTORE comando . Una tabella Delta gestisce internamente le versioni storiche della tabella che consentono di ripristinarla a uno stato precedente. Una versione corrispondente allo stato precedente o a un timestamp di quando è stato creato lo stato precedente sono supportate come opzioni dal comando RESTORE.

Importante

  • È possibile ripristinare una tabella già ripristinata.
  • È possibile ripristinare una tabella clonata .
  • È necessario disporre MODIFY dell'autorizzazione per la tabella da ripristinare.
  • Non è possibile ripristinare una tabella in una versione precedente in cui i file di dati sono stati eliminati manualmente o da vacuum. Il ripristino in questa versione parzialmente è comunque possibile se spark.sql.files.ignoreMissingFiles è impostato su true.
  • Il formato timestamp per il ripristino in uno stato precedente è yyyy-MM-dd HH:mm:ss. È supportata anche solo una stringa date(yyyy-MM-dd).
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

Per informazioni dettagliate sulla sintassi, vedere RESTORE.

Importante

Il ripristino viene considerato un'operazione di modifica dei dati. Le voci di log delta Lake aggiunte dal RESTORE comando contengono dataChange impostato su true. Se è presente un'applicazione downstream, ad esempio un processo di streaming strutturato che elabora gli aggiornamenti a una tabella Delta Lake, le voci del log delle modifiche dei dati aggiunte dall'operazione di ripristino vengono considerate come nuovi aggiornamenti dei dati e l'elaborazione può comportare dati duplicati.

Ad esempio:

Versione tabella Operazione Aggiornamenti del log differenziale Record negli aggiornamenti del log delle modifiche dei dati
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Victor, age = 29, (name = George, age = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (nome = George, età = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (Nessun record come ottimizzazione della compattazione non modifica i dati nella tabella)
3 RESTORE(version=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (name = Victor, age = 29), (name = George, age = 55), (name = George, age = 39)

Nell'esempio precedente, il RESTORE comando genera aggiornamenti già visualizzati durante la lettura della tabella Delta versione 0 e 1. Se una query di streaming legge questa tabella, questi file verranno considerati come dati appena aggiunti e verranno elaborati di nuovo.

Ripristinare le metriche

RESTORE segnala le metriche seguenti come singolo dataframe di riga al termine dell'operazione:

  • table_size_after_restore: dimensioni della tabella dopo il ripristino.

  • num_of_files_after_restore: numero di file nella tabella dopo il ripristino.

  • num_removed_files: numero di file rimossi (eliminati logicamente) dalla tabella.

  • num_restored_files: numero di file ripristinati a causa del rollback.

  • removed_files_size: dimensione totale in byte dei file rimossi dalla tabella.

  • restored_files_size: dimensioni totali in byte dei file ripristinati.

    Esempio di metriche di ripristino

Esempi di utilizzo dello spostamento cronologico di Delta Lake

  • Correggere le eliminazioni accidentali in una tabella per l'utente 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Correzione di aggiornamenti accidentali non corretti in una tabella:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Eseguire una query sul numero di nuovi clienti aggiunti nell'ultima settimana.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Ricerca per categorie trovare la versione dell'ultimo commit nella sessione Spark?

Per ottenere il numero di versione dell'ultimo commit scritto dall'oggetto corrente SparkSession in tutti i thread e in tutte le tabelle, eseguire una query sulla configurazione spark.databricks.delta.lastCommitVersionInSessionSQL .

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Se non sono stati eseguiti commit da , l'esecuzione SparkSessiondi query sulla chiave restituisce un valore vuoto.

Nota

Se si condivide lo stesso SparkSession tra più thread, è simile alla condivisione di una variabile tra più thread. È possibile che si verifichino race condition man mano che il valore di configurazione viene aggiornato contemporaneamente.