Best practices: Delta Lake

This article describes best practices when using Delta Lake.

Provide data location hints

If you expect a column to be commonly used in query predicates and if that column has high cardinality (that is, a large number of distinct values), then use Z-ORDER BY. Delta Lake automatically lays out the data in the files based on the column values and uses the layout information to skip irrelevant data while querying.

For details, see Data skipping with Z-order indexes for Delta Lake.

Compact files

If you continuously write data to a Delta table, it will over time accumulate a large number of files, especially if you add data in small batches. This can have an adverse effect on the efficiency of table reads, and it can also affect the performance of your file system. Ideally, a large number of small files should be rewritten into a smaller number of larger files on a regular basis. This is known as compaction.

You can compact a table using the OPTIMIZE command.

Note

This operation does not remove the old files. To remove them, run the VACUUM command.

Replace the content or schema of a table

Sometimes you may want to replace a Delta table. For example:

  • You discover the data in the table is incorrect and want to replace the content.
  • You want to rewrite the whole table to do incompatible schema changes (such as changing column types).

While you can delete the entire directory of a Delta table and create a new table on the same path, it’s not recommended because:

  • Deleting a directory is not efficient. A directory containing very large files can take hours or even days to delete.
  • You lose all of content in the deleted files; it’s hard to recover if you delete the wrong table.
  • The directory deletion is not atomic. While you are deleting the table a concurrent query reading the table can fail or see a partial table.

If you don’t need to change the table schema, you can delete data from a Delta table and insert your new data, or update the table to fix the incorrect values.

If you want to change the table schema, you can replace the whole table atomically. For example:

Python

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("<your-table>") # Managed table

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .option("path", "<your-table-path>") \
  .saveAsTable("<your-table>") # External table

SQL

REPLACE TABLE <your-table> USING DELTA AS SELECT ... -- Managed table
REPLACE TABLE <your-table> USING DELTA LOCATION "<your-table-path>" AS SELECT ... -- External table

Scala

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable("<your-table>") // Managed table

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .option("path", "<your-table-path>")
  .saveAsTable("<your-table>") // External table

There are multiple benefits with this approach:

  • Overwriting a table is much faster because it doesn’t need to list the directory recursively or delete any files.
  • The old version of the table still exists. If you delete the wrong table you can easily retrieve the old data using Time Travel.
  • It’s an atomic operation. Concurrent queries can still read the table while you are deleting the table.
  • Because of Delta Lake ACID transaction guarantees, if overwriting the table fails, the table will be in its previous state.

In addition, if you want to delete old files to save storage cost after overwriting the table, you can use VACUUM to delete them. It’s optimized for file deletion and usually faster than deleting the entire directory.

Spark caching

Databricks does not recommend that you use Spark caching for the following reasons:

  • You lose any data skipping that can come from additional filters added on top of the cached DataFrame.
  • The data that gets cached might not be updated if the table is accessed using a different identifier (for example, you do spark.table(x).cache() but then write to the table using spark.write.save(/some/path).

Differences between Delta Lake and Parquet on Apache Spark

Delta Lake handles the following operations automatically. You should never perform these operations manually:

  • REFRESH TABLE: Delta tables always return the most up-to-date information, so there is no need to call REFRESH TABLE manually after changes.
  • Add and remove partitions: Delta Lake automatically tracks the set of partitions present in a table and updates the list as data is added or removed. As a result, there is no need to run ALTER TABLE [ADD|DROP] PARTITION or MSCK.
  • Load a single partition: Reading partitions directly is not necessary. For example, you don’t need to run spark.read.format("parquet").load("/data/date=2017-01-01"). Instead, use a WHERE clause for data skipping, such as spark.read.table("<table_name>").where("date = '2017-01-01'").
  • Don’t manually modify data files: Delta Lake uses the transaction log to commit changes to the table atomically. Do not directly modify, add, or delete Parquet data files in a Delta table, because this can lead to lost data or table corruption.

Improve performance for Delta Lake merge

You can reduce the time taken by merge using the following approaches:

  • Reduce the search space for matches: By default, the merge operation searches the entire Delta table to find matches in the source table. One way to speed up merge is to reduce the search space by adding known constraints in the match condition. For example, suppose you have a table that is partitioned by country and date and you want to use merge to update information for the last day and a specific country. Adding the following condition makes the query faster, as it looks for matches only in the relevant partitions:

    events.date = current_date() AND events.country = 'USA'
    

    Furthermore, this query also reduces the chances of conflicts with other concurrent operations. See Isolation levels and write conflicts on Azure Databricks for more details.

  • Compact files: If the data is stored in many small files, reading the data to search for matches can become slow. You can compact small files into larger files to improve read throughput. See Compact data files with optimize on Delta Lake for details.

  • Control the shuffle partitions for writes: The merge operation shuffles data multiple times to compute and write the updated data. The number of tasks used to shuffle is controlled by the Spark session configuration spark.sql.shuffle.partitions. Setting this parameter not only controls the parallelism but also determines the number of output files. Increasing the value increases parallelism but also generates a larger number of smaller data files.

  • Enable optimized writes: For partitioned tables, merge can produce a much larger number of small files than the number of shuffle partitions. This is because every shuffle task can write multiple files in multiple partitions, and can become a performance bottleneck. You can reduce the number of files by enabling Optimized Write.

    Note

    In Databricks Runtime 7.4 and above, Optimized Write is automatically enabled in merge operations on partitioned tables.

  • Tune file sizes in table: In Databricks Runtime 8.2 and above, Azure Databricks can automatically detect if a Delta table has frequent merge operations that rewrite files and may choose to reduce the size of rewritten files in anticipation of further file rewrites in the future. See the section on tuning file sizes for details.

  • Low Shuffle Merge: In Databricks Runtime 9.0 and above, Low Shuffle Merge provides an optimized implementation of MERGE that provides better performance for most common workloads. In addition, it preserves existing data layout optimizations such as Z-ordering on unmodified data.

Manage data recency

At the beginning of each query, Delta tables auto-update to the latest version of the table. This process can be observed in notebooks when the command status reports: Updating the Delta table's state. However, when running historical analysis on a table, you may not necessarily need up-to-the-last-minute data, especially for tables where streaming data is being ingested frequently. In these cases, queries can be run on stale snapshots of your Delta table. This approach can lower latency in getting results from queries.

You can configure how stale your table data is by setting the Spark session configuration spark.databricks.delta.stalenessLimit with a time string value. For example, 1h, 15m, and 1d for 1 hour, 15 minutes, and 1 day, respectively. This configuration is session specific, therefore won’t affect other users accessing this table from other notebooks, jobs, or BI tools. In addition, this setting doesn’t prevent your table from updating; it only prevents a query from having to wait for the table to update. The update still occurs in the background, and will share resources fairly across the cluster. If the staleness limit is exceeded, then the query will block on the table state update.

Enhanced checkpoints for low-latency queries

Delta Lake writes checkpoints as an aggregate state of a Delta table at an optimized frequency. These checkpoints serve as the starting point to compute the latest state of the table. Without checkpoints, Delta Lake would have to read a large collection of JSON files (“delta” files) representing commits to the transaction log to compute the state of a table. In addition, the column-level statistics Delta Lake uses to perform data skipping are stored in the checkpoint.

Important

Delta Lake checkpoints are different than Structured Streaming checkpoints.

In Databricks Runtime 7.3 LTS and above, column-level statistics are stored as a struct and a JSON (for backwards compatability). The struct format makes Delta Lake reads much faster, because:

  • Delta Lake doesn’t perform expensive JSON parsing to obtain column-level statistics.
  • Parquet column pruning capabilities significantly reduce the I/O required to read the statistics for a column.

The struct format enables a collection of optimizations that reduce the overhead of Delta Lake read operations from seconds to tens of milliseconds, which significantly reduces the latency for short queries.

Manage column-level statistics in checkpoints

You manage how statistics are written in checkpoints using the table properties delta.checkpoint.writeStatsAsJson and delta.checkpoint.writeStatsAsStruct. If both table properties are false, Delta Lake cannot perform data skipping.

  • Batch writes write statistics in both JSON and struct format. delta.checkpoint.writeStatsAsJson is true.
  • delta.checkpoint.writeStatsAsStruct is undefined by default.
  • Readers use the struct column when available and otherwise fall back to using the JSON column.

For streaming writes:

  • Databricks Runtime 7.5 and above: write statistics in both JSON format and struct format.
  • Databricks Runtime 7.3 LTS and 7.4: write statistics in only JSON format (to minimize the impact of checkpoints on write latency). To also write the struct format, see Enable enhanced checkpoints for Structured Streaming queries.

Important

Enhanced checkpoints do not break compatibility with open source Delta Lake readers. However, setting delta.checkpoint.writeStatsAsJson to false may have implications on proprietary Delta Lake readers. Contact your vendors to learn more about performance implications.

Enable enhanced checkpoints for Structured Streaming queries

If your Structured Streaming workloads don’t have low latency requirements (subminute latencies), you can enable enhanced checkpoints by running the following SQL command:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

You can also improve the checkpoint write latency by setting the following table properties:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

If data skipping is not useful in your application, you can set both properties to false. Then no statistics are collected or written. Databricks does not recommend this configuration.