Table utility commands

Delta tables support a number of utility commands.

Note

Some of the following code examples use a two-level namespace notation consisting of a schema (also called a database) and a table or view (for example, default.people10m). To use these examples with Unity Catalog, replace the two-level namespace with Unity Catalog three-level namespace notation consisting of a catalog, schema, and table or view (for example, main.default.people10m).

Remove files no longer referenced by a Delta table

You can remove files no longer referenced by a Delta table and are older than the retention threshold by running the vacuum command on the table. vacuum is not triggered automatically. The default retention threshold for the files is 7 days. To change this behavior, see Data retention.

Important

  • vacuum removes all files from directories not managed by Delta Lake, ignoring directories beginning with _. If you are storing additional metadata like Structured Streaming checkpoints within a Delta table directory, use a directory name such as _checkpoints.
  • vacuum deletes only data files, not log files. Log files are deleted automatically and asynchronously after checkpoint operations. The default retention period of log files is 30 days, configurable through the delta.logRetentionDuration property which you set with the ALTER TABLE SET TBLPROPERTIES SQL method. See Table properties.
  • The ability to time travel back to a version older than the retention period is lost after running vacuum.

Note

When disk caching is enabled, a cluster might contain data from Parquet files that have been deleted with vacuum. Therefore, it may be possible to query the data of previous table versions whose files have been deleted. Restarting the cluster will remove the cached data. See Configure the disk cache.

SQL

VACUUM eventsTable   -- vacuum files not required by versions older than the default retention period

VACUUM '/data/events' -- vacuum files in path-based table

VACUUM delta.`/data/events/`

VACUUM delta.`/data/events/` RETAIN 100 HOURS  -- vacuum files not required by versions more than 100 hours old

VACUUM eventsTable DRY RUN    -- do dry run to get the list of files to be deleted

For Spark SQL syntax details, see

  • Databricks Runtime 7.x and above: VACUUM

Python

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, pathToTable)  # path-based tables, or
deltaTable = DeltaTable.forName(spark, tableName)    # Hive metastore-based tables

deltaTable.vacuum()        # vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100)     # vacuum files not required by versions more than 100 hours old

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

deltaTable.vacuum()        // vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100)     // vacuum files not required by versions more than 100 hours old

Java

import io.delta.tables.*;
import org.apache.spark.sql.functions;

DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);

deltaTable.vacuum();        // vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100);     // vacuum files not required by versions more than 100 hours old

See the Delta Lake APIs for Scala, Java, and Python syntax details.

Warning

It is recommended that you set a retention interval to be at least 7 days, because old snapshots and uncommitted files can still be in use by concurrent readers or writers to the table. If VACUUM cleans up active files, concurrent readers can fail or, worse, tables can be corrupted when VACUUM deletes files that have not yet been committed. You must choose an interval that is longer than the longest running concurrent transaction and the longest period that any stream can lag behind the most recent update to the table.

Delta Lake has a safety check to prevent you from running a dangerous VACUUM command. If you are certain that there are no operations being performed on this table that take longer than the retention interval you plan to specify, you can turn off this safety check by setting the Spark configuration property spark.databricks.delta.retentionDurationCheck.enabled to false.

Audit information

VACUUM commits to the Delta transaction log contain audit information. You can query the audit events using DESCRIBE HISTORY.

Retrieve Delta table history

You can retrieve information on the operations, user, timestamp, and so on for each write to a Delta table by running the history command. The operations are returned in reverse chronological order. By default table history is retained for 30 days.

SQL

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

For Spark SQL syntax details, see

Python

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, pathToTable)

fullHistoryDF = deltaTable.history()    # get the full history of the table

lastOperationDF = deltaTable.history(1) # get the last operation

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

val fullHistoryDF = deltaTable.history()    // get the full history of the table

val lastOperationDF = deltaTable.history(1) // get the last operation

Java

import io.delta.tables.*;

DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);

DataFrame fullHistoryDF = deltaTable.history();       // get the full history of the table

DataFrame lastOperationDF = deltaTable.history(1);    // fetch the last operation on the DeltaTable

See the Delta Lake APIs for Scala/Java/Python syntax details.

History schema

The output of the history operation has the following columns.

Column Type Description
version long Table version generated by the operation.
timestamp timestamp When this version was committed.
userId string ID of the user that ran the operation.
userName string Name of the user that ran the operation.
operation string Name of the operation.
operationParameters map Parameters of the operation (for example, predicates.)
job struct Details of the job that ran the operation.
notebook struct Details of notebook from which the operation was run.
clusterId string ID of the cluster on which the operation ran.
readVersion long Version of the table that was read to perform the write operation.
isolationLevel string Isolation level used for this operation.
isBlindAppend boolean Whether this operation appended data.
operationMetrics map Metrics of the operation (for example, number of rows and files modified.)
userMetadata string User-defined commit metadata if it was specified
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Note

Operation metrics keys

The history operation returns a collection of operations metrics in the operationMetrics column map.

The following tables list the map key definitions by operation.

Operation Metric name Description
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO
numFiles Number of files written.
numOutputBytes Size in bytes of the written contents.
numOutputRows Number of rows written.
STREAMING UPDATE
numAddedFiles Number of files added.
numRemovedFiles Number of files removed.
numOutputRows Number of rows written.
numOutputBytes Size of write in bytes.
DELETE
numAddedFiles Number of files added. Not provided when partitions of the table are deleted.
numRemovedFiles Number of files removed.
numDeletedRows Number of rows removed. Not provided when partitions of the table are deleted.
numCopiedRows Number of rows copied in the process of deleting files.
executionTimeMs Time taken to execute the entire operation.
scanTimeMs Time taken to scan the files for matches.
rewriteTimeMs Time taken to rewrite the matched files.
TRUNCATE
numRemovedFiles Number of files removed.
executionTimeMs Time taken to execute the entire operation.
MERGE
numSourceRows Number of rows in the source DataFrame.
numTargetRowsInserted Number of rows inserted into the target table.
numTargetRowsUpdated Number of rows updated in the target table.
numTargetRowsDeleted Number of rows deleted in the target table.
numTargetRowsCopied Number of target rows copied.
numOutputRows Total number of rows written out.
numTargetFilesAdded Number of files added to the sink(target).
numTargetFilesRemoved Number of files removed from the sink(target).
executionTimeMs Time taken to execute the entire operation.
scanTimeMs Time taken to scan the files for matches.
rewriteTimeMs Time taken to rewrite the matched files.
UPDATE
numAddedFiles Number of files added.
numRemovedFiles Number of files removed.
numUpdatedRows Number of rows updated.
numCopiedRows Number of rows just copied over in the process of updating files.
executionTimeMs Time taken to execute the entire operation.
scanTimeMs Time taken to scan the files for matches.
rewriteTimeMs Time taken to rewrite the matched files.
FSCK numRemovedFiles Number of files removed.
CONVERT numConvertedFiles Number of Parquet files that have been converted.
OPTIMIZE
numAddedFiles Number of files added.
numRemovedFiles Number of files optimized.
numAddedBytes Number of bytes added after the table was optimized.
numRemovedBytes Number of bytes removed.
minFileSize Size of the smallest file after the table was optimized.
p25FileSize Size of the 25th percentile file after the table was optimized.
p50FileSize Median file size after the table was optimized.
p75FileSize Size of the 75th percentile file after the table was optimized.
maxFileSize Size of the largest file after the table was optimized.
Operation Metric name Description
CLONE (1)
sourceTableSize Size in bytes of the source table at the version that’s cloned.
sourceNumOfFiles Number of files in the source table at the version that’s cloned.
numRemovedFiles Number of files removed from the target table if a previous Delta table was replaced.
removedFilesSize Total size in bytes of the files removed from the target table if a previous Delta table was replaced.
numCopiedFiles Number of files that were copied over to the new location. 0 for shallow clones.
copiedFilesSize Total size in bytes of the files that were copied over to the new location. 0 for shallow clones.
RESTORE (2)
tableSizeAfterRestore Table size in bytes after restore.
numOfFilesAfterRestore Number of files in the table after restore.
numRemovedFiles Number of files removed by the restore operation.
numRestoredFiles Number of files that were added as a result of the restore.
removedFilesSize Size in bytes of files removed by the restore.
restoredFilesSize Size in bytes of files added by the restore.
VACUUM (3)
numDeletedFiles Number of deleted files.
numVacuumedDirectories Number of vacuumed directories.
numFilesToDelete Number of files to delete.

(1) Requires Databricks Runtime 7.3 LTS or above.

(2) Requires Databricks Runtime 7.4 or above.

(3) Requires Databricks Runtime 8.2 or above.

Retrieve Delta table details

You can retrieve detailed information about a Delta table (for example, number of files, data size) using DESCRIBE DETAIL.

SQL

DESCRIBE DETAIL '/data/events/'

DESCRIBE DETAIL eventsTable

For Spark SQL syntax details, see DESCRIBE DETAIL.

Python

Note

The Python API is available in Databricks Runtime 11.2 and above.

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, pathToTable)

detailDF = deltaTable.detail()

Scala

Note

The Scala API is available in Databricks Runtime 11.2 and above.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

val detailDF = deltaTable.detail()

Java

Note

The Java API is available in Databricks Runtime 11.2 and above.

import io.delta.tables.*;

DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);

DataFrame detailDF = deltaTable.detail();

See the Delta Lake APIs for Scala/Java/Python syntax details.

Detail schema

The output of this operation has only one row with the following schema.

Column Type Description
format string Format of the table, that is, delta.
id string Unique ID of the table.
name string Name of the table as defined in the metastore.
description string Description of the table.
location string Location of the table.
createdAt timestamp When the table was created.
lastModified timestamp When the table was last modified.
partitionColumns array of strings Names of the partition columns if the table is partitioned.
numFiles long Number of the files in the latest version of the table.
sizeInBytes int The size of the latest snapshot of the table in bytes.
properties string-string map All the properties set for this table.
minReaderVersion int Minimum version of readers (according to the log protocol) that can read the table.
minWriterVersion int Minimum version of writers (according to the log protocol) that can write to the table.
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
|format|                  id|              name|description|            location|           createdAt|       lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
| delta|d31f82d2-a69f-42e...|default.deltatable|       null|file:/Users/tuor/...|2020-06-05 12:20:...|2020-06-05 12:20:20|              []|      10|      12345|        []|               1|               2|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+

Convert a Parquet table to a Delta table

Convert a Parquet table to a Delta table in-place. This command lists all the files in the directory, creates a Delta Lake transaction log that tracks these files, and automatically infers the data schema by reading the footers of all Parquet files. If your data is partitioned, you must specify the schema of the partition columns as a DDL-formatted string (that is, <column-name1> <type>, <column-name2> <type>, ...).

Note

If a Parquet table was created by Structured Streaming, the listing of files can be avoided by using the _spark_metadata sub-directory as the source of truth for files contained in the table setting the SQL configuration spark.databricks.delta.convert.useMetadataLog to true.

SQL

-- Convert unpartitioned Parquet table at path '<path-to-table>'
CONVERT TO DELTA parquet.`<path-to-table>`

-- Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
CONVERT TO DELTA parquet.`<path-to-table>` PARTITIONED BY (part int, part2 int)

For syntax details, see

Python

from delta.tables import *

# Convert unpartitioned Parquet table at path '<path-to-table>'
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")

# Convert partitioned parquet table at path '<path-to-table>' and partitioned by integer column named 'part'
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int")

Scala

import io.delta.tables._

// Convert unpartitioned Parquet table at path '<path-to-table>'
val deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")

// Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
val partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int, part2 int")

Java

import io.delta.tables.*;

// Convert unpartitioned Parquet table at path '<path-to-table>'
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`");

// Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int, part2 int");

Note

Any file not tracked by Delta Lake is invisible and can be deleted when you run vacuum. You should avoid updating or appending data files during the conversion process. After the table is converted, make sure all writes go through Delta Lake.

Convert an Iceberg table to a Delta table

Note

This feature is in Public Preview.

This feature is supported in Databricks Runtime 10.4 and above.

You can convert an Iceberg table to a Delta table in place if the underlying file format of the Iceberg table is Parquet. The following command creates a Delta Lake transaction log based on the Iceberg table’s native file manifest, schema and partitioning information. The converter also collects column stats during the conversion, unless NO STATISTICS is specified.

-- Convert the Iceberg table in the path <path-to-table>.
CONVERT TO DELTA iceberg.`<path-to-table>`

-- Convert the Iceberg table in the path <path-to-table> without collecting statistics.
CONVERT TO DELTA iceberg.`<path-to-table>` NO STATISTICS

Note

Converting Iceberg metastore tables is not supported.

Convert a Delta table to a Parquet table

You can easily convert a Delta table back to a Parquet table using the following steps:

  1. If you have performed Delta Lake operations that can change the data files (for example, delete or merge), run vacuum with retention of 0 hours to delete all data files that do not belong to the latest version of the table.
  2. Delete the _delta_log directory in the table directory.

Restore a Delta table to an earlier state

Note

Available in Databricks Runtime 7.4 and above.

You can restore a Delta table to its earlier state by using the RESTORE command. A Delta table internally maintains historic versions of the table that enable it to be restored to an earlier state. A version corresponding to the earlier state or a timestamp of when the earlier state was created are supported as options by the RESTORE command.

Important

  • You can restore an already restored table.

  • You can restore a cloned table.

  • Restoring a table to an older version where the data files were deleted manually or by vacuum will fail. Restoring to this version partially is still possible if spark.sql.files.ignoreMissingFiles is set to true.

  • The timestamp format for restoring to an earlier state is yyyy-MM-dd HH:mm:ss. Providing only a date(yyyy-MM-dd) string is also supported.

SQL

RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

Python

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, <path-to-table>)  # path-based tables, or
deltaTable = DeltaTable.forName(spark, <table-name>)    # Hive metastore-based tables

deltaTable.restoreToVersion(0) # restore table to oldest version

deltaTable.restoreToTimestamp('2019-02-14') # restore to a specific timestamp

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, <path-to-table>)
val deltaTable = DeltaTable.forName(spark, <table-name>)

deltaTable.restoreToVersion(0) // restore table to oldest version

deltaTable.restoreToTimestamp("2019-02-14") // restore to a specific timestamp

Java

import io.delta.tables.*;

DeltaTable deltaTable = DeltaTable.forPath(spark, <path-to-table>);
DeltaTable deltaTable = DeltaTable.forName(spark, <table-name>);

deltaTable.restoreToVersion(0) // restore table to oldest version

deltaTable.restoreToTimestamp("2019-02-14") // restore to a specific timestamp

For syntax details, see RESTORE (Delta Lake on Azure Databricks).

Important

Restore is considered a data-changing operation. Delta Lake log entries added by the RESTORE command contain dataChange set to true. If there is a downstream application, such as a Structured streaming job that processes the updates to a Delta Lake table, the data change log entries added by the restore operation are considered as new data updates, and processing them may result in duplicate data.

For example:

Table version Operation Delta log updates Records in data change log updates
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Viktor, age = 29, (name = George, age = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (name = George, age = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (No records as Optimize compaction does not change the data in the table)
3 RESTORE(version=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

In the preceding example, the RESTORE command results in updates that were already seen when reading the Delta table version 0 and 1. If a streaming query was reading this table, then these files will be considered as newly added data and will be processed again.

Restore metrics

Note

Available in Databricks Runtime 8.2 and above.

RESTORE reports the following metrics as a single row DataFrame once the operation is complete:

  • table_size_after_restore: The size of the table after restoring.

  • num_of_files_after_restore: The number of files in the table after restoring.

  • num_removed_files: Number of files removed (logically deleted) from the table.

  • num_restored_files: Number of files restored due to rolling back.

  • removed_files_size: Total size in bytes of the files that are removed from the table.

  • restored_files_size: Total size in bytes of the files that are restored.

    Restore metrics example

Table access control

You must have MODIFY permission on the table being restored.

Find the last commit’s version in the Spark session

Note

Available in Databricks Runtime 7.1 and above.

To get the version number of the last commit written by the current SparkSession across all threads and all tables, query the SQL configuration spark.databricks.delta.lastCommitVersionInSession.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

If no commits have been made by the SparkSession, querying the key returns an empty value.

Note

If you share the same SparkSession across multiple threads, it’s similar to sharing a variable across multiple threads; you may hit race conditions as the configuration value is updated concurrently.