Use liquid clustering for Delta tables
Delta Lake liquid clustering replaces table partitioning and ZORDER
to simplify data layout decisions and optimize query performance. Liquid clustering provides flexibility to redefine clustering keys without rewriting existing data, allowing data layout to evolve alongside analytic needs over time.
Important
Databricks recommends using Databricks Runtime 15.2 and above for all tables with liquid clustering enabled. Public preview support with limitations is available in Databricks Runtime 13.3 LTS and above.
Note
Tables with liquid clustering enabled support row-level concurrency in Databricks Runtime 13.3 LTS and above. Row-level concurrency is generally available in Databricks Runtime 14.2 and above for all tables with deletion vectors enabled. See Isolation levels and write conflicts on Azure Databricks.
What is liquid clustering used for?
Databricks recommends liquid clustering for all new Delta tables. The following are examples of scenarios that benefit from clustering:
- Tables often filtered by high cardinality columns.
- Tables with significant skew in data distribution.
- Tables that grow quickly and require maintenance and tuning effort.
- Tables with concurrent write requirements.
- Tables with access patterns that change over time.
- Tables where a typical partition key could leave the table with too many or too few partitions.
Enable liquid clustering
You can enable liquid clustering on an existing table or during table creation. Clustering is not compatible with partitioning or ZORDER
, and requires that you use Azure Databricks to manage all layout and optimization operations for data in your table. After liquid clustering is enabled, run OPTIMIZE
jobs as usual to incrementally cluster data. See How to trigger clustering.
To enable liquid clustering, add the CLUSTER BY
phrase to a table creation statement, as in the examples below:
Note
In Databricks Runtime 14.2 and above, you can use DataFrame APIs and DeltaTable API in Python or Scala to enable liquid clustering.
SQL
-- Create an empty table
CREATE TABLE table1(col0 int, col1 string) CLUSTER BY (col0);
-- Using a CTAS statement
CREATE EXTERNAL TABLE table2 CLUSTER BY (col0) -- specify clustering after table name, not in subquery
LOCATION 'table_location'
AS SELECT * FROM table1;
-- Using a LIKE statement to copy configurations
CREATE TABLE table3 LIKE table1;
Python
# Create an empty table
(DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute())
# Using a CTAS statement
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
# CTAS using DataFrameWriterV2
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
Scala
// Create an empty table
DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute()
// Using a CTAS statement
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
// CTAS using DataFrameWriterV2
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
In Databricks Runtime 16.0 and above, you can create tables with liquid clustering enabled using Structured Streaming writes, as in the following examples:
Python
(spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
)
Scala
spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
Warning
Tables created with liquid clustering enabled have numerous Delta table features enabled at creation and use Delta writer version 7 and reader version 3. You can override the enablement of some of these features. See Override default feature enablement (optional).
Table protocol versions cannot be downgraded, and tables with clustering enabled are not readable by Delta Lake clients that do not support all enabled Delta reader protocol table features. See How does Azure Databricks manage Delta Lake feature compatibility?.
You can enable liquid clustering on an existing unpartitioned Delta table using the following syntax:
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
Important
Default behavior does not apply clustering to previously written data. To force reclustering for all records, you must use OPTIMIZE FULL
. See Force reclustering for all records.
Override default feature enablement (optional)
You can override default behavior that enables Delta table features during liquid clustering enablement. This prevents the reader and writer protocols associated with those table features from being upgraded. You must have an existing table to complete the following steps:
Use
ALTER TABLE
to set the table property that disables one or more features. For example, to disable deletion vectors run the following:ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
Enable liquid clustering on the table by running the following:
ALTER TABLE <table_name> CLUSTER BY (<clustering_columns>)
The following table provides information on the Delta features you can override and how enablement impacts compatibility with Databricks Runtime versions.
Delta feature | Runtime compatibility | Property to override enablement | Impact of disablement on liquid clustering |
---|---|---|---|
Deletion vectors | Reads and writes require Databricks Runtime 12.2 lTS and above. | 'delta.enableDeletionVectors' = false |
Row-level concurrency is disabled, making transactions and clustering operations more likely to conflict. See Write conflicts with row-level concurrency.DELETE , MERGE , and UPDATE commands might run slower. |
Row tracking | Writes require Databricks Runtime 13.3 LTS and above. Can be read from any Databricks Runtime version. | 'delta.enableRowTracking' = false |
Row-level concurrency is disabled, making transactions and clustering operations more likely to conflict. See Write conflicts with row-level concurrency. |
Checkpoints V2 | Reads and writes require Databricks Runtime 13.3 LTS and above. | 'delta.checkpointPolicy' = 'classic' |
No impact on liquid clustering behavior. |
Choose clustering keys
Databricks recommends choosing clustering keys based on commonly used query filters. Clustering keys can be defined in any order. If two columns are correlated, you only need to add one of them as a clustering key.
You can specify up to 4 columns as clustering keys. You can only specify columns with statistics collected for clustering keys. By default, the first 32 columns in a Delta table have statistics collected. See Specify Delta statistics columns.
Clustering supports the following data types for clustering keys:
- Date
- Timestamp
- TimestampNTZ (requires Databricks Runtime 14.3 LTS or above)
- String
- Integer
- Long
- Short
- Float
- Double
- Decimal
- Byte
If you’re converting an existing table, consider the following recommendations:
Current data optimization technique | Recommendation for clustering keys |
---|---|
Hive-style partitioning | Use partition columns as clustering keys. |
Z-order indexing | Use the ZORDER BY columns as clustering keys. |
Hive-style partitioning and Z-order | Use both partition columns and ZORDER BY columns as clustering keys. |
Generated columns to reduce cardinality (for example, date for a timestamp) | Use the original column as a clustering key, and don’t create a generated column. |
Write data to a clustered table
You must use a Delta writer client that supports all Delta write protocol table features used by liquid clustering. On Azure Databricks, you must use Databricks Runtime 13.3 LTS and above.
Operations that cluster on write include the following:
INSERT INTO
operationsCTAS
andRTAS
statementsCOPY INTO
from Parquet formatspark.write.mode("append")
Structured Streaming writes never trigger clustering on write. Additional limitations apply. See Limitations.
Clustering on write only triggers when data in the transaction meets a size threshold. These thresholds vary by the number of clustering columns and are lower for Unity Catalog managed tables than other Delta tables.
Number of clustering columns | Threshold size for Unity Catalog managed tables | Threshold size for other Delta tables |
---|---|---|
1 | 64 MB | 256 MB |
2 | 256 MB | 1 GB |
3 | 512 MB | 2 GB |
4 | 1 GB | 4 GB |
Because not all operations apply liquid clustering, Databricks recommends frequently running OPTIMIZE
to ensure that all data is efficiently clustered.
How to trigger clustering
Predictive optimization automatically runs OPTIMIZE
commands for enabled tables. See Predictive optimization for Unity Catalog managed tables.
To trigger clustering, you must use Databricks Runtime 13.3 LTS or above. Use the OPTIMIZE
command on your table, as in the following example:
OPTIMIZE table_name;
Liquid clustering is incremental, meaning that data is only rewritten as necessary to accommodate data that needs to be clustered. Data files with clustering keys that do not match data to be clustered are not rewritten.
For best performance, Databricks recommends scheduling regular OPTIMIZE
jobs to cluster data. For tables experiencing many updates or inserts, Databricks recommends scheduling an OPTIMIZE
job every one or two hours. Because liquid clustering is incremental, most OPTIMIZE
jobs for clustered tables run quickly.
Force reclustering for all records
In Databricks Runtime 16.0 and above, you can force reclustering of all records in a table with the following syntax:
OPTIMIZE table_name FULL;
Important
Running OPTIMIZE FULL
reclusters all existing data as necessary. For large tables that have not previously been clustered on the specified keys, this operation might take hours.
Run OPTIMIZE FULL
when you enable clustering for the first time or change clustering keys. If you have previously run OPTIMIZE FULL
and there has been no change to clustering keys, OPTIMIZE FULL
runs the same as OPTIMIZE
. Always use OPTIMIZE FULL
to ensure that data layout reflects the current clustering keys.
Read data from a clustered table
You can read data in a clustered table using any Delta Lake client that supports reading deletion vectors. For best query results, include clustering keys in your query filters, as in the following example:
SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";
Change clustering keys
You can change clustering keys for a table at any time by running an ALTER TABLE
command, as in the following example:
ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);
When you change clustering keys, subsequent OPTIMIZE
and write operations use the new clustering approach, but existing data is not rewritten.
You can also turn off clustering by setting the keys to NONE
, as in the following example:
ALTER TABLE table_name CLUSTER BY NONE;
Setting cluster keys to NONE
does not rewrite data that has already been clustered, but prevents future OPTIMIZE
operations from using clustering keys.
See how table is clustered
You can use DESCRIBE
commands to see the clustering keys for a table, as in the following examples:
DESCRIBE TABLE table_name;
DESCRIBE DETAIL table_name;
Compatibility for tables with liquid clustering
Tables created with liquid clustering in Databricks Runtime 14.1 and above use v2 checkpoints by default. You can read and write tables with v2 checkpoints in Databricks Runtime 13.3 LTS and above.
You can disable v2 checkpoints and downgrade table protocols to read tables with liquid clustering in Databricks Runtime 12.2 LTS and above. See Drop Delta table features.
Limitations
The following limitations exist:
- In Databricks Runtime 15.1 and below, clustering on write does not support source queries that include filters, joins, or aggregations.
- Structured Streaming workloads do not support clustering-on-write.
- In Databricks Runtime 15.4 LTS and below, you cannot create a table with liquid clustering enabled using a Structured Streaming write. You can use Structured Streaming to write data to an existing table with liquid clustering enabled.