液體聚類是一種資料佈局最佳化技術,它取代了表分割和 ZORDER。 它簡化了表管理,並透過根據叢集鍵自動組織資料來優化查詢效能。
與傳統分割不同,您可以重新定義叢集索引鍵,而無需重寫現有資料。 這可讓您的資料版面配置隨著分析需求的變化而發展。 液體叢集同時適用於串流資料表和具體化檢視。
Important
Liquid 叢集通常適用於 Delta Lake 數據表和受控 Apache Iceberg 數據表的公開預覽版。 針對 Delta Lake 資料表,Databricks Runtime 15.2 和更新版本提供 GA 支援。 Databricks 建議使用最新的 Databricks Runtime 以獲得最佳效能。 針對 Apache Iceberg 數據表,需要 Databricks Runtime 16.4 LTS 和更新版本。
何時使用液態聚類
Databricks 建議針對所有新資料表使用流動叢集,包括串流資料表和具體化檢視。 下列案例特別受益於叢集:
- 通常由高基數資料行篩選的資料表。
- 資料分佈有偏差的資料表。
- 快速增長且需要維護和調整的資料表。
- 具有並行寫入需求的資料表。
- 具有隨時間變更的存取模式的資料表。
- 資料表中,典型的分割索引鍵可能會導致該資料表擁有過多或過少的資料分割區。
啟用液體群集
您可以在現有未分割的資料表上或在建立資料表期間啟用液體叢集。 叢集與分割或ZORDER不相容。 Databricks 建議允許平臺管理資料表中資料的所有版面配置和最佳化作業。 啟用液體叢集之後,請執行 OPTIMIZE 工作以累加叢集資料。 請參閱 如何觸發叢集。
使用叢集建立資料表
若要啟用液體群集,請將 CLUSTER BY 字串新增至資料表創建語句,如下列範例所示。 在 Databricks Runtime 14.2 和更新版本中,您可以使用 Python 或 Scala 中的 DataFrame API 和 DeltaTable API 來啟用 Delta Lake 數據表的液體叢集。
SQL
-- Create an empty Delta table with clustering on col0
CREATE TABLE table1(col0 INT, col1 string) CLUSTER BY (col0);
-- Create table from existing data with clustering
-- Note: CLUSTER BY must appear after table name, not in SELECT clause
CREATE TABLE table2 CLUSTER BY (col0)
AS SELECT * FROM table1;
-- Copy table structure including clustering configuration
CREATE TABLE table3 LIKE table1;
Python
# Create an empty Delta table with clustering on col0
(DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0") # Single clustering key
.execute())
# Create clustered table from existing DataFrame
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
# Alternative: DataFrameWriterV2 API (DBR 14.2+)
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
Scala
// Create an empty Delta table with clustering on col0
DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute()
// Create clustered table from existing DataFrame
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
// Alternative: DataFrameWriterV2 API (DBR 14.2+)
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
Important
使用 DataFrame API 設定叢集索引鍵時,只能在建立表格時指定叢集列,或是在使用 overwrite 模式(例如進行 CREATE OR REPLACE TABLE 操作)時設定。 使用 append 模式時,您無法變更叢集索引鍵。
若要在附加資料時變更現有資料表上的叢集索引鍵,請使用 SQL ALTER TABLE 命令將叢集組態與資料寫入作業分開修改。
在 Databricks Runtime 16.0 和更新版本中,您可以使用結構化串流寫入來建立已啟用液體叢集的資料表。 Databricks 建議使用 Databricks Runtime 16.4 和更新版本以獲得最佳效能,如下列範例所示:
SQL
CREATE TABLE table1 (
col0 STRING,
col1 DATE,
col2 BIGINT
)
CLUSTER BY (col0, col1);
Python
(spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
)
Scala
spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
Warning
已啟用液體叢集的 Delta 資料表會使用 Delta 寫入器第 7 版和讀取器第 3 版。 不支援這些協議的 Delta 客戶端無法讀取這些資料表。 您無法降級資料表通訊協定版本。 請參閱 Delta Lake 功能相容性和通訊協定。
若要覆寫預設功能啟用 (例如刪除向量),請參閱 覆寫預設功能啟用 (選擇性)。
在現有資料表上啟用
使用下列語法,在現有未分割的 Delta 資料表上啟用液體叢集:
-- Alter an existing table
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
針對 Apache Iceberg,您必須在現有的 Managed Iceberg 數據表上啟用液體群集時,明確停用刪除向量和數據列標識符。
Note
默認行為不會將叢集套用至先前寫入的數據。 若要強制對所有記錄重新分群,您必須使用 OPTIMIZE FULL。 請參見對所有記錄進行強制重新群集。
移除叢集索引鍵
若要移除叢集索引鍵,請使用下列語法:
ALTER TABLE table_name CLUSTER BY NONE;
選擇叢集金鑰
小提示
Databricks 建議針對支援的資料表使用自動液體叢集,這會根據您的查詢模式智慧地選取叢集索引鍵。 請參閱 自動液體群集。
關鍵選擇指南
當您手動指定叢集索引鍵時,請根據查詢篩選器中最常使用的欄來選擇相應的欄位。 您可以依任何順序定義叢集索引鍵。 如果兩個數據行高度相互關聯,您只需要將其中一個數據行當做叢集索引鍵來包含。
您最多可以指定 四個叢集索引鍵。 對於較小的資料表 (低於 10 TB),使用更多叢集索引鍵可能會降低單一資料行篩選時的效能。 例如,使用四個索引鍵進行篩選的效能比使用兩個索引鍵進行篩選的效能更差。 不過,隨著資料表大小的增加,對於單欄查詢來說,這種效能差異可以忽略不計。
叢集鍵必須是已收集統計資料的欄位。 依預設,Delta 表中的前 32 欄已收集統計資料。 請參閱指定 Delta 統計資料欄。
支援的數據類型
叢集支援叢集索引鍵的下列資料類型:
- Date
- Timestamp
- TimestampNTZ (Databricks Runtime 14.3 LTS 和更新版本)
- String
- 整數、長整數、短整數、位元組
- 浮點、雙精度、十進制
從分區或 Z 序遷移
如果您要轉換現有的數據表,請考慮下列建議:
| 目前的數據優化技術 | 叢集金鑰的建議 |
|---|---|
| Hive 格式分割 | 使用分割區數據行作為叢集索引鍵。 |
| Z 順序索引 | 使用 ZORDER BY 列用作叢集索引鍵。 |
| Hive 風格的資料分割和 Z 順序 | 同時使用分區列和 ZORDER BY 列作為叢集鍵。 |
| 自動產生的字段用于降低基數(例如,從時間戳中提取的日期) | 使用原始數據行作為叢集索引鍵,而且不會建立產生的數據行。 |
自動液體群集
在 Databricks Runtime 15.4 LTS 和更新版本中,您可以啟用 Unity Catalog 管理的 Delta 表的自動液體叢集化。 自動液態叢集允許 Azure Databricks 使用 CLUSTER BY AUTO 子句,以智慧方式選擇叢集索引鍵,從而優化查詢效能。
自動液體叢集的工作原理
自動液體叢集會根據您的使用模式提供智慧最佳化:
- 需要預測最佳化:自動金鑰選取和叢集作業會以非同步方式執行為維護作業。 請參閱 Unity Catalog 受控資料表的預測性最佳化。
- 分析查詢工作負載:Azure Databricks 會分析資料表的歷史查詢工作負載,並識別叢集的最佳候選資料行。
- 適應變化:如果您的查詢模式或資料分佈隨時間變化,自動液體叢集會選擇新鍵來優化效能。
- 成本感知選擇:只有在資料略過改善的預測成本節省超過資料叢集成本時,Azure Databricks 才會變更叢集索引鍵。
自動液體聚類可能不會選取索引鍵,原因可能包括以下幾點:
- 表格太小,無法利用液體群集的優勢。
- 表格已具有有效的叢集配置,無論是來自先前的手動索引鍵,還是符合查詢模式的自然插入順序。
- 數據表沒有頻繁的查詢。
- 您不是使用 Databricks Runtime 15.4 LTS 或更新版本。
不論數據和查詢特性為何,您都可以為所有 Unity 目錄受控數據表套用自動液體叢集。 啟發式會決定選取叢集索引鍵是否具有成本效益。
DBR 版本相容性
您可以從支援液體叢集的所有 Databricks Runtime 版本,讀取或寫入已啟用自動叢集的數據表。 不過,智慧型密鑰選取依賴 Databricks Runtime 15.4 LTS 中引進的元數據。
使用 Databricks Runtime 15.4 LTS 或更新版本來確保自動選取的密鑰有利於您所有的工作負載,並在選取新密鑰時考慮這些工作負載。
啟用或停用自動液體群集
若要在新的或現有的數據表上啟用或停用自動液體群集,請使用下列語法:
SQL
-- Create an empty table.
CREATE OR REPLACE TABLE table1(column01 int, column02 string) CLUSTER BY AUTO;
-- Enable automatic liquid clustering on an existing table,
-- including tables that previously had manually specified keys.
ALTER TABLE table1 CLUSTER BY AUTO;
-- Disable automatic liquid clustering on an existing table.
ALTER TABLE table1 CLUSTER BY NONE;
-- Disable automatic liquid clustering by setting the clustering keys
-- to chosen clustering columns or new columns.
ALTER TABLE table1 CLUSTER BY (column01, column02);
如果您在未指定CREATE OR REPLACE table_name的情況下執行CLUSTER BY AUTO,且表格已存在且已啟用自動液體叢集,則會停用設定AUTO,且不會保留叢集直欄。 若要保留自動液體群集和任何先前選取的群集欄,請在 replace 陳述式中包含 CLUSTER BY AUTO。 保留時,預測最佳化會維護資料表的歷史查詢工作負載,以識別最佳叢集索引鍵。
Python
df = spark.read.table("table1")
df.write
.format("delta")
.option(“clusterByAuto”, “true”)
.saveAsTable(...)
# Set clustering columns and auto to provide a hint for initial selection
df.write
.format("delta")
.clusterBy("clusteringColumn1", "clusteringColumn2")
.option("clusterByAuto", "true")
.saveAsTable(...)
# Using DataFrameWriterV2
df.writeTo(...).using("delta")
.option("clusterByAuto", "true")
.create()
# Set clustering columns and auto to provide a hint for initial selection
df.writeTo(...).using("delta")
.clusterBy("clusteringColumn1", "clusteringColumn2")
.option("clusterByAuto", "true")
.create()
# Set clusterByAuto for streaming tables
spark.readStream.table("source_table")
.writeStream
.option("clusterByAuto", "true")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
# Specify a hint for clustering columns with both auto and columns
spark.readStream.table("source_table")
.writeStream
.clusterBy("column1", "column2")
.option("clusterByAuto", "true")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
Python API 適用於 Databricks Runtime 16.4 和更新版本。 當您將 .clusterBy 與 .option('clusterByAuto', 'true) 一起使用時,其行為如下:
- 如果這是第一次設定自動液體聚類,則始終遵循手動輸入,並在
.clusterBy中設定聚類欄位。 - 如果這已經是具有自動液體群集的數據表,則可以接受使用
.clusterBy的提示一次。 例如,只有在表格尚未設定叢集欄位時,才會設定.clusterBy指定的欄位。
您只能在建立或取代表格時使用 Python。 使用 SQL 來變更 clusterByAuto 現有資料表的狀態。
Important
使用 DataFrame API 時,選項clusterByAuto只能在使用overwrite模式時設定。 使用clusterByAuto模式時無法設定append。 此限制與手動設定分群資料行時相同 - 分群設定只能在資料表創建或替換作業中使用overwrite模式進行設定。
因應措施是,如果您想要在附加資料時變更 clusterByAuto 現有資料表的狀態,請使用 SQL ALTER TABLE 命令將叢集組態與資料寫入作業分開修改。
檢查是否已啟用自動叢集
若要檢查資料表是否已啟用自動液體群集,請使用 DESCRIBE TABLE 或 SHOW TBLPROPERTIES。
如果已啟用自動液體群集,屬性 clusterByAuto 會設定為 true。 屬性 clusteringColumns 會顯示自動或手動選取的目前叢集數據行。
Limitations
Apache Iceberg 不支援自動液體分組。
將數據寫入叢集數據表
若要寫入叢集 Delta 數據表,您必須使用支援液體叢集使用之所有 Delta 寫入通訊協議資料表功能的 Delta 寫入器用戶端。 若要寫入叢集的 Iceberg 資料表,您可以使用 Unity 目錄的 Iceberg REST 目錄 API。 在 Azure Databricks 上,您必須使用 Databricks Runtime 13.3 LTS 和更新版本。
支援寫入叢集的作業
寫入時叢集的操作包括以下各項:
-
INSERT INTO運作 -
CTAS和RTAS語句 -
COPY INTO來自於 Parquet 格式 spark.write.mode("append")
叢集的大小臨界值
只有當交易中的數據達到大小閾值時,才會在寫入操作中啟動叢集。 這些臨界值會因叢集列數目而有所不同,Unity Catalog 管理的表格比其他 Delta 表的臨界值更低。
| 叢集資料欄的數目 | Unity Catalog 管理表格的臨界大小 | 其他 Delta 數據表的臨界值大小 |
|---|---|---|
| 1 | 64 MB | 256 MB |
| 2 | 256 MB | 1 GB |
| 3 | 512 MB | 2 GB |
| 4 | 1 GB | 4 GB |
因為並非所有作業都套用液體叢集,因此 Databricks 建議經常執行 OPTIMIZE ,以確保所有數據都能有效率地叢集化。
串流工作負載
結構化串流工作負載在您將 Spark 配置 spark.databricks.delta.liquid.eagerClustering.streaming.enabled 設定為 true 時支援在寫入時進行叢集處理。 如果過去五次串流更新中至少有一次超過上表的大小閾值,這些工作負載的叢集才會觸發。
如何觸發叢集
預測優化會自動對已啟用的資料表執行命令 OPTIMIZE。 請參閱 Unity Catalog 受控資料表的預測性最佳化。 使用預測性優化時,Databricks 建議停用任何排程 OPTIMIZE 的工作。
若要觸發叢集,您必須使用 Databricks Runtime 13.3 LTS 或更新版本。 Databricks 建議使用 Databricks Runtime 17.2 和更新版本,以在大型資料表上取得更快 OPTIMIZE 的效能。 在資料表中使用 OPTIMIZE 命令:
OPTIMIZE table_name;
液體叢集是 增量的,這表示 OPTIMIZE 只會視需要重寫資料,以容納需要叢集的資料。
OPTIMIZE 不會使用與所叢集資料不符的叢集索引鍵重寫資料檔案。
如果您未使用預測性優化,Databricks 建議將一般 OPTIMIZE 作業排程到叢集數據。 對於進行很多更新或插入的數據表,Databricks 建議每隔一兩小時排程一次 OPTIMIZE 作業。 因為液體叢集是累加式的,因此叢集數據表的大部分 OPTIMIZE 作業都會快速執行。
強制將所有記錄重新群集
在 Databricks Runtime 16.0 及更高版本中,您可以使用以下語法來強制重新分群資料表中所有記錄:
OPTIMIZE table_name FULL;
Important
執行 OPTIMIZE FULL 視需要將所有現有的數據重新叢集。 對於先前未在指定索引鍵上叢集的大型數據表,此作業可能需要數小時的時間。
第一次啟用叢集或變更叢集金鑰時,請執行 OPTIMIZE FULL。 如果您先前已執行 OPTIMIZE FULL 且叢集金鑰沒有變更,OPTIMIZE FULL 執行與 OPTIMIZE相同。 在此案例中, OPTIMIZE 會使用累加方法,並只重寫先前尚未壓縮的檔案。 請一律使用 OPTIMIZE FULL,以確保數據配置反映目前的叢集索引鍵。
從叢集數據表讀取數據
您可以使用支援讀取刪除向量的任何 Delta Lake 用戶端,讀取叢集 Delta 資料表中的數據。 使用 Iceberg REST 目錄 API,您可以在叢集 Iceberg 數據表中讀取數據。 液體叢集在篩選叢集索引鍵時,透過自動略過資料來改善查詢效能。
SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";
管理叢集索引鍵
查看資料表的叢集方式
您可以使用 DESCRIBE 命令來檢視資料表的叢集索引鍵,如下列範例所示:
DESCRIBE TABLE table_name;
DESCRIBE DETAIL table_name;
變更叢集金鑰
您可以執行 ALTER TABLE 命令,隨時變更資料表的叢集索引鍵,如下列範例所示:
ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);
當您變更叢集金鑰時,後續 OPTIMIZE 和寫入作業會使用新的叢集方法,但不會重寫現有的數據。
您也可以將金鑰設定為 NONE關閉叢集,如下列範例所示:
ALTER TABLE table_name CLUSTER BY NONE;
將叢集索引鍵設定為 NONE 不會重寫叢集資料,但會防止未來的 OPTIMIZE 作業使用叢集索引鍵。
使用外部引擎的液體叢集
您可以在受控管理的 Iceberg 資料表上,透過外部 Iceberg 引擎啟用 Liquid Clustering。 若要啟用液態群集,請在建立數據表時指定分割欄。 Unity 目錄會將分割區解譯為叢集索引鍵。 例如,在 OSS Spark 中執行下列命令:
CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY c1;
您可以停用液體群集:
ALTER TABLE main.schema.icebergTable DROP PARTITION FIELD c2;
您可以使用 Iceberg 資料分割演進來變更叢集索引鍵:
ALTER TABLE main.schema.icebergTable ADD PARTITION FIELD c2;
如果您使用桶轉換指定分區,Unity Catalog 捨棄運算式,並使用資料欄作為叢集索引鍵。
CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY (bucket(c1, 10));
與液體叢集的數據表相容性
液體集群使用 Delta 表功能,而這些功能需要特定版本的 Databricks Runtime 才能進行讀取和寫入。 在 Databricks Runtime 14.1 和更高版本中,使用液體群集的數據表會預設使用 v2 檢查點。 您可以在 Databricks Runtime 13.3 LTS 和更新版本中使用 v2 檢查點讀取和寫入數據表。
您可以在 Databricks Runtime 12.2 LTS 和更新版本停用 v2 檢查點和降級數據表通訊協定,以讀取具有液體叢集的數據表。 請參閱 刪除 Delta Lake 資料表功能和降級資料表協議。
覆寫預設功能啟用(可選)
您可以在啟用 Liquid Clustering 時覆寫預設的 Delta 表格功能。 這可防止與這些數據表功能相關聯的讀取器和寫入器通訊協議升級。 您必須有現有的資料表,才能完成下列步驟:
使用
ALTER TABLE來設定停用一或多個功能的數據表屬性。 例如,若要停用刪除向量,請執行下列動作:ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);執行下列命令,在資料表上啟用液體叢集:
ALTER TABLE <table_name> CLUSTER BY (<clustering_columns>)
下表提供您可以覆寫的 Delta 特徵的相關信息,以及啟用會如何影響與 Databricks Runtime 版本的相容性。
| Delta 功能 | 執行階段相容性 | 覆蓋啟用狀態的屬性 | 停用對液體群集的影響 |
|---|---|---|---|
| 刪除向量 | 讀取和寫入需要 Databricks Runtime 12.2 LTS 和更新版本。 | 'delta.enableDeletionVectors' = false |
停用刪除向量會停用資料列層級並行,使交易和叢集作業更容易發生衝突。 請參閱資料列層級並行的寫入衝突。DELETE、 MERGE和 UPDATE 命令的執行速度可能會變慢。 |
| 列追蹤 | 寫入操作需要 Databricks Runtime 13.3 LTS 或更高版本。 可以從任何 Databricks Runtime 版本進行讀取。 | 'delta.enableRowTracking' = false |
停用資料列追蹤功能會關閉資料列層級的併發控制,使交易和叢集運算作業更容易發生衝突。 請參閱資料列層級並行的寫入衝突。 |
| 檢查點 V2 | 讀取和寫入需要 Databricks Runtime 13.3 LTS 和更新版本。 | 'delta.checkpointPolicy' = 'classic' |
不會影響液體群集行為。 |
Limitations
- DBR 15.1 和更低版本:寫入時的叢集操作不支援帶有篩選、聯結或彙總的來源查詢。
- DBR 15.4 LTS 和更低版本:您無法使用結構化串流寫入來建立啟用液體叢集的資料表。 您可以使用結構化串流將資料寫入已啟用液體叢集的現有資料表。
- Apache Iceberg v2:使用 Apache Iceberg v2 的受管 Iceberg 資料表不支援資料列層級並行,因為 Iceberg 資料表不支援刪除向量和資料列追蹤。