Work with table history

Each operation that modifies a table creates a new table version. Use history information to audit operations, rollback a table, or query a table at a specific point in time using time travel.

Note

Databricks doesn't recommend using table history as a long-term backup solution for data archival. Use only the past 7 days for time travel operations unless you have set both data and log retention configurations to a larger value.

Retrieve table history

Retrieve information including the operations, user, and timestamp for each write to a table by running the history command. The operations are returned in reverse chronological order.

Table history retention is determined by the table setting logRetentionDuration, which is 30 days by default.

Note

Time travel and table history are controlled by different retention thresholds. See What is time travel?.

DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only

For Spark SQL syntax details, see DESCRIBE HISTORY.

See the Delta Lake API documentation for Scala/Java/Python syntax details.

Catalog Explorer provides a visual view of this detailed table information and history. In addition to the table schema and sample data, you can click the History tab to see the table history that displays with DESCRIBE HISTORY.

History schema

The output of the history operation has the following columns.

Column Type Description
version long Table version generated by the operation.
timestamp timestamp When this version was committed.
userId string ID of the user that ran the operation.
userName string Name of the user that ran the operation.
operation string Name of the operation.
operationParameters map Parameters of the operation (for example, predicates.)
job struct Details of the Lakeflow job that ran the operation. Populates only for commits written from a Lakeflow job. Otherwise, null.
notebook struct Details of the Databricks notebook from which the operation was run. Populates only for commits written from a Databricks notebook. Otherwise, null.
clusterId string ID of the cluster on which the operation ran.
readVersion long Version of the table that was read to perform the write operation.
isolationLevel string Isolation level used for this operation.
isBlindAppend boolean Whether this operation appended data.
operationMetrics map Metrics of the operation (for example, number of rows and files modified.)
userMetadata string User-defined commit metadata if it was specified.
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Note

Understanding partitionBy in operation parameters

The partitionBy field is only meaningful for CREATE and OVERWRITE operations that define or change a table's partition schema.

For append operations to existing tables (APPEND, INSERT, UPDATE, DELETE, MERGE), this field might show an empty array [] or partition columns depending on the write method used (.save() vs .saveAsTable()). This inconsistency is expected behavior and should not be used to validate writes.

Important

Do not rely on partitionBy in history to validate append operations. The value varies based on implementation details but does not affect how data is written to partitions.

Example

Consider a table partitioned by the date column:

# Initial table creation - partitionBy is populated
df.write.format("delta") \
  .partitionBy("date") \
  .saveAsTable("sales_data")

The CREATE operation in history shows:

operationParameters: {
  "mode": "ErrorIfExists",
  "partitionBy": "[\"date\"]"
}

When you append data to this table:

# Subsequent append - partitionBy shows empty
new_df.write.format("delta") \
  .mode("append") \
  .saveAsTable("sales_data")

The APPEND operation shows:

operationParameters: {
  "mode": "Append",
  "partitionBy": "[]"
}

The empty partitionBy value is expected. The data is still written to the correct partitions based on the table's existing partition schema. Note that .save() to a path may show partition columns in this field, but this difference is an implementation detail and does not affect write behavior.

Operation metrics

The history operation returns a collection of operations metrics in the operationMetrics column map.

The following tables list the map key definitions by operation.

WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO

The following metrics are available for these operations:

Metric name Description
numFiles The number of files written.
numOutputBytes The size in bytes of the written contents.
numOutputRows The number of rows written.

STREAMING UPDATE

The following metrics are available for this operation:

Metric name Description
numAddedFiles The number of files added.
numRemovedFiles The number of files removed.
numOutputRows The number of rows written.
numOutputBytes The size of write in bytes.

DELETE

The following metrics are available for this operation:

Metric name Description
numAddedFiles The number of files added. Not provided when partitions of the table are deleted.
numRemovedFiles The number of files removed.
numDeletedRows The number of rows removed. Not provided when partitions of the table are deleted.
numCopiedRows The number of rows copied in the process of deleting files.
executionTimeMs The time taken to execute the entire operation.
scanTimeMs The time taken to scan the files for matches.
rewriteTimeMs The time taken to rewrite the matched files.

TRUNCATE

The following metrics are available for this operation:

Metric name Description
numRemovedFiles The number of files removed.
executionTimeMs The time taken to execute the entire operation.

MERGE

The following metrics are available for this operation:

Metric name Description
numSourceRows The number of rows in the source DataFrame.
numTargetRowsInserted The number of rows inserted into the target table.
numTargetRowsUpdated The number of rows updated in the target table.
numTargetRowsDeleted The number of rows deleted in the target table.
numTargetRowsCopied The number of target rows copied.
numOutputRows The total number of rows written out.
numTargetFilesAdded The number of files added to the sink (target).
numTargetFilesRemoved The number of files removed from the sink (target).
executionTimeMs The time taken to execute the entire operation.
scanTimeMs The time taken to scan the files for matches.
rewriteTimeMs The time taken to rewrite the matched files.

UPDATE

The following metrics are available for this operation:

Metric name Description
numAddedFiles The number of files added.
numRemovedFiles The number of files removed.
numUpdatedRows The number of rows updated.
numCopiedRows The number of rows just copied over in the process of updating files.
executionTimeMs The time taken to execute the entire operation.
scanTimeMs The time taken to scan the files for matches.
rewriteTimeMs The time taken to rewrite the matched files.

FSCK

The following metrics are available for this operation:

Metric name Description
numRemovedFiles The number of files removed.

CONVERT

The following metrics are available for this operation:

Metric name Description
numConvertedFiles The number of Parquet files that have been converted.

OPTIMIZE

The following metrics are available for this operation:

Metric name Description
numAddedFiles The number of files added.
numRemovedFiles The number of files optimized.
numAddedBytes The number of bytes added after the table was optimized.
numRemovedBytes The number of bytes removed.
minFileSize The size of the smallest file after the table was optimized.
p25FileSize The size of the 25th percentile file after the table was optimized.
p50FileSize The median file size after the table was optimized.
p75FileSize The size of the 75th percentile file after the table was optimized.
maxFileSize The size of the largest file after the table was optimized.

CLONE

The following metrics are available for this operation:

Metric name Description
sourceTableSize The size in bytes of the source table at the version that's cloned.
sourceNumOfFiles The number of files in the source table at the version that's cloned.
numRemovedFiles The number of files removed from the target table if a previous table was replaced.
removedFilesSize The total size in bytes of the files removed from the target table if a previous table was replaced.
numCopiedFiles The number of files that were copied over to the new location. 0 for shallow clones.
copiedFilesSize The total size in bytes of the files that were copied over to the new location. 0 for shallow clones.

RESTORE

The following metrics are available for this operation:

Metric name Description
tableSizeAfterRestore The table size in bytes after restore.
numOfFilesAfterRestore The number of files in the table after restore.
numRemovedFiles The number of files removed by the restore operation.
numRestoredFiles The number of files that were added as a result of the restore.
removedFilesSize The size in bytes of files removed by the restore.
restoredFilesSize The size in bytes of files added by the restore.

VACUUM

The following metrics are available for this operation:

Metric name Description
numDeletedFiles The number of deleted files.
numVacuumedDirectories The number of vacuumed directories.
numFilesToDelete The number of files to delete.

What is time travel?

Time travel supports querying previous table versions based on timestamp or table version (as recorded in the transaction log). You can use time travel for applications such as the following:

  • Re-creating analyses, reports, or outputs (for example, the output of a machine learning model). This could be useful for debugging or auditing, especially in regulated industries.
  • Writing complex temporal queries.
  • Fixing mistakes in your data.
  • Providing snapshot isolation for a set of queries for fast changing tables.

Important

In Databricks Runtime 18.0 and above, time travel queries are blocked if they request a version older than the deletedFileRetentionDuration table property (default 7 days). For Unity Catalog managed tables, this applies to Databricks Runtime 12.2 and above.

Time travel syntax

You query a table with time travel by adding a clause after the table name specification.

  • timestamp_expression can be any one of:
    • '2018-10-18T22:15:12.013Z', that is, a string that can be cast to a timestamp
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', that is, a date string
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Any other expression that is or can be cast to a timestamp
  • version is a long value that can be obtained from the output of DESCRIBE HISTORY table_spec.

Neither timestamp_expression nor version can be subqueries.

Only date or timestamp strings are accepted. For example, "2019-01-01" and "2019-01-01T00:00:00.000Z". See the following code for example syntax:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")

You can also use the @ syntax to specify the timestamp or version as part of the table name. The timestamp must be in yyyyMMddHHmmssSSS format. You can specify a version after @ by prepending a v to the version. See the following code for example syntax:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

What are transaction log checkpoints?

Table versions are recorded as JSON files within the transaction log directory, which is stored alongside table data. To optimize checkpoint querying, table versions are aggregated to Parquet checkpoint files, preventing the need to read all JSON versions of table history. Azure Databricks optimizes checkpointing frequency for data size and workload. Users should not need to interact with checkpoints directly. The checkpoint frequency is subject to change without notice.

Configure data retention for time travel queries

To query a previous table version, you must retain both the log and the data files for that version.

Data files are deleted when VACUUM runs against a table. Log file removal is managed automatically after checkpointing table versions.

Because most tables have VACUUM run against them regularly, point-in-time queries should respect the retention threshold for VACUUM, which is 7 days by default.

To increase the data retention threshold for tables, you must configure the following table properties:

  • delta.logRetentionDuration = "interval <interval>": controls how long the history for a table is kept. The default is interval 30 days. For Apache Iceberg tables, use iceberg.logRetentionDuration instead.
  • delta.deletedFileRetentionDuration = "interval <interval>": determines the threshold VACUUM uses to remove data files no longer referenced in the current table version. The default is interval 7 days. For Apache Iceberg tables, use iceberg.deletedFileRetentionDuration instead.

You can specify table properties during table creation or set them with an ALTER TABLE statement. See Table properties reference.

Note

In Databricks Runtime 18.0 and above, logRetentionDuration must be greater than or equal to deletedFileRetentionDuration. For Unity Catalog managed tables, this applies to Databricks Runtime 12.2 and above.

To access 30 days of historical data, set delta.deletedFileRetentionDuration = "interval 30 days" (which matches the default setting for delta.logRetentionDuration).

Increasing data retention threshold can cause your storage costs to go up, as more data files are maintained.

Restore a table to an earlier state

You can restore a table to its earlier state by using the RESTORE command. Tables internally maintain historic versions that enable them to be restored to an earlier state. A version corresponding to the earlier state or a timestamp of when the earlier state was created are supported as options by the RESTORE command.

Important

  • You can restore an already restored table.
  • You can restore a cloned table.
  • You must have MODIFY permission on the table being restored.
  • You cannot restore a table to an older version where the data files were deleted manually or by vacuum. Restoring to this version partially is still possible if spark.sql.files.ignoreMissingFiles is set to true.
  • The timestamp format for restoring to an earlier state is yyyy-MM-dd HH:mm:ss. Providing only a date(yyyy-MM-dd) string is also supported.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

For syntax details, see RESTORE.

Important

Restore is considered a data-changing operation. Log entries added by the RESTORE command contain dataChange set to true. If there is a downstream application, such as a Structured streaming job that processes the updates to a table, the data change log entries added by the restore operation are considered as new data updates, and processing them may result in duplicate data.

For example:

Table version Operation Log updates Records in data change log updates
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Viktor, age = 29), (name = George, age = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (name = George, age = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) No records. OPTIMIZE compaction does not change the data in the table.
3 RESTORE(version=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

In the preceding example, the RESTORE command results in updates that were already seen when reading the table version 0 and 1. If a streaming query was reading this table, then these files will be considered as newly added data and will be processed again.

:::

Restore metrics

RESTORE reports the following metrics as a single row DataFrame once the operation is complete:

  • table_size_after_restore: The size of the table after restoring.

  • num_of_files_after_restore: The number of files in the table after restoring.

  • num_removed_files: Number of files removed (logically deleted) from the table.

  • num_restored_files: Number of files restored due to rolling back.

  • removed_files_size: Total size in bytes of the files that are removed from the table.

  • restored_files_size: Total size in bytes of the files that are restored.

    Restore metrics example

Examples of using time travel

  • Fix accidental deletes to a table for the user 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Fix accidental incorrect updates to a table:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Query the number of new customers added over the last week.

    SELECT
    (
      SELECT count(distinct userId)
      FROM my_table
    )
    -
    (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7)
    ) AS new_customers
    

How do I find the last commit's version in the Spark session?

To get the version number of the last commit written by the current SparkSession across all threads and all tables, query the SQL configuration spark.databricks.delta.lastCommitVersionInSession.

Note

For Apache Iceberg tables, use spark.databricks.iceberg.lastCommitVersionInSession instead of spark.databricks.delta.lastCommitVersionInSession.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

If no commits have been made by the SparkSession, querying the key returns an empty value.

Note

If you share the same SparkSession across multiple threads, it's similar to sharing a variable across multiple threads; you may hit race conditions as the configuration value is updated concurrently.