重要
此功能在 Databricks Runtime 15.4 LTS 及更高版本中处于公共预览阶段。
启用了类型扩展的表允许将列的数据类型更改为更宽泛的类型,而无需重写基础数据文件。 你可以手动更改列类型,也可以使用架构演变来改进列类型。
重要
Databricks Runtime 15.4 LTS 及更高版本中支持类型扩展。 启用了加宽类型的表只能在 Databricks Runtime 15.4 LTS 及更高版本中读取。
类型扩展需要 Delta Lake。 所有 Unity Catalog 托管表都默认使用 Delta Lake。
支持的类型更改
可以根据以下规则扩展类型:
| 源类型 | 支持的更广泛类型 |
|---|---|
BYTE |
SHORT、INT、BIGINT、DECIMAL、DOUBLE |
SHORT |
int、BIGINT、DECIMAL、DOUBLE |
INT |
.- . |
BIGINT |
DECIMAL |
FLOAT |
DOUBLE |
DECIMAL |
具有更高精度和小数位数的 DECIMAL |
DATE |
TIMESTAMP_NTZ |
顶层列和嵌套在结构、映射和数组中的字段支持类型更改。
重要
默认情况下,当一个操作将整数类型提升为 decimal 或 double,并且下游处理将该值写回整数列时,Spark 会截断该值的小数部分。 有关分配策略行为的详细信息,请参阅 存储分配。
注意
将任何数值类型更改为 decimal时,总精度必须等于或大于起始精度。 如果还增加规模,总体精度也必须相应增加。
byte、short和 int 类型的最小目标为 decimal(10,0)。
long 的最低目标是 decimal(20,0)。
如果要将两个小数位数添加到具有 decimal(10,1)的字段,则最小目标为 decimal(12,3)。
启用类型扩展
你可以通过将 delta.enableTypeWidening 表属性设置为 true,在现有表上启用类型扩展:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
你还可以在创建表期间启用类型扩展:
CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
重要
启用类型扩展时,它将设置表功能 typeWidening,这将升级读取器和编写器协议。 必须使用 Databricks Runtime 15.4 或更高版本才能与已启用类型扩大的表进行交互。 如果外部客户端也与表交互,请验证它们是否支持此表功能。 请参阅 Delta Lake 功能兼容性和协议。
手动应用类型更改
使用 ALTER COLUMN 命令手动更改类型:
ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>
此操作可在不重写基础数据文件的情况下更新表架构。 有关详细信息,请参阅 ALTER TABLE。
使用自动架构演变扩展类型
架构演变适用于类型扩展以更新目标表中的数据类型以匹配传入数据的类型。
注意
如果未启用类型扩大,架构演变始终会尝试向下转换数据以匹配目标表中的列类型。 如果不希望自动扩展目标表中的数据类型,请在启用架构演变的情况下在运行工作负荷之前禁用类型扩展。
若要在引入期间使用架构演变来扩大列的数据类型,必须满足以下条件:
- 写入命令在启用自动架构演变的情况下运行。
- 目标表已启用类型扩展。
- 源列类型比目标列类型宽泛。
- 类型扩展支持类型更改。
不满足所有这些条件的类型不匹配遵循正常的架构强制规则。 请参阅架构强制。
以下示例演示了类型扩宽如何在架构演变中适用于常见写入操作。
Python
# Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
# Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
# Example 2: Automatic type widening in MERGE INTO
from delta.tables import DeltaTable
source_df = spark.table("source_table")
target_table = DeltaTable.forName(spark, "target_table")
(target_table.alias("target")
.merge(source_df.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala(编程语言)
// Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
// Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
// Example 2: Automatic type widening in MERGE INTO
import io.delta.tables.DeltaTable
val sourceDf = spark.table("source_table")
val targetTable = DeltaTable.forName(spark, "target_table")
targetTable.alias("target")
.merge(sourceDf.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
SQL
-- Create target table with INT column and source table with BIGINT column
CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true');
CREATE TABLE source_table (id BIGINT, data STRING);
-- Example 1: Automatic type widening in INSERT INTO
---- Enable schema evolution
SET spark.databricks.delta.schema.autoMerge.enabled = true;
---- Insert data with BIGINT value column - automatically widens INT to BIGINT
INSERT INTO target_table SELECT * FROM source_table;
-- Example 2: Automatic type widening in MERGE INTO
MERGE WITH SCHEMA EVOLUTION INTO target_table
USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
禁用类型扩展表功能
通过将属性设置为 false,可以阻止启用的表上发生意外类型扩展:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')
此设置可防止对表进行将来的类型更改,但不会删除类型扩大表功能或撤消以前的类型更改。
如果需要完全删除类型扩展表功能,可以使用 DROP FEATURE 命令,如以下示例所示:
ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]
注意
使用 Databricks Runtime 15.4 LTS 启用了类型扩大的数据库表,需要转而删除功能 typeWidening-preview。
弃用类型扩展时,Databricks 将重写不符合当前表架构的所有数据文件。 请参阅删除 Delta Lake 表功能并降级表协议。
从 Delta 表进行流式传输
注意
Databricks Runtime 16.4 LTS 及更高版本中提供了对结构化流式处理中的类型扩展的支持。
从启用了类型加宽的 Delta 表进行流式处理时,可以通过在目标表上启用 mergeSchema 选项来进行架构演变,以配置流式查询的自动类型加宽。 目标表必须启用类型加宽。 请参阅 “启用类型扩大”。
Python
(spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
)
Scala(编程语言)
spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
当启用mergeSchema并且目标表已启用类型扩展时:
- 类型更改会自动应用于下游表,而无需手动干预。
- 下游表结构会自动添加新列。
如果未 mergeSchema 启用,则根据 spark.sql.storeAssignmentPolicy 配置处理值,默认情况下,向下转换值以匹配目标列类型。 有关分配策略行为的详细信息,请参阅 存储分配。
手动类型更改确认
从 Delta 表流式传输时,可以提供架构跟踪位置来跟踪非累加架构更改,包括类型更改。 在 Databricks Runtime 18.0 及更低版本中提供架构跟踪位置是必需的,在 Databricks Runtime 18.1 及更高版本中是可选的。
Python
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
Scala(编程语言)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
提供架构跟踪位置后,流在检测到类型更改后会改进其跟踪的架构,然后停止。 此时,可以采取任何必要作来处理类型更改,例如在下游表上启用类型扩大或更新流式处理查询。
若要恢复处理,请设置 Spark 配置 spark.databricks.delta.streaming.allowSourceColumnTypeChange 或 DataFrame 读取器选项 allowSourceColumnTypeChange:
Python
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
# alternatively to allow all future type changes for this stream:
# .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
Scala(编程语言)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
// alternatively to allow all future type changes for this stream:
// .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
SQL
-- To unblock for this particular stream just for this series of schema change(s):
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_<checkpoint_id> = "<delta_source_table_version>"
-- To unblock for this particular stream:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "<delta_source_table_version>"
-- To unblock for all streams:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "always"
流停止时,检查点 ID <checkpoint_id> 和 Delta Lake 源表版本 <delta_source_table_version> 会显示在错误消息中。
Lakeflow Spark 声明性管道
注意
在预览频道中提供了对 Lakeflow Spark 声明性管道中类型扩展的支持。
可以在管道级别或单个表上为 Lakeflow Spark Declarative Pipelines 启用类型扩展功能。 类型扩大允许在管道执行期间自动扩大列类型,而无需完全刷新流式处理表。 具体化视图中的类型更改始终触发完全重新计算,当类型更改应用于源表时,依赖于该表的具体化视图需要完全重新计算才能反映新类型。
启用整个流水线的类型拓宽
若要为管道中的所有表启用类型扩展,请设置管道配置 pipelines.enableTypeWidening:
JSON
{
"configuration": {
"pipelines.enableTypeWidening": "true"
}
}
YAML
configuration:
pipelines.enableTypeWidening: 'true'
为特定表启用类型扩展
还可以通过设置表属性 delta.enableTypeWidening为单个表启用类型扩大:
Python
import dlt
@dlt.table(
table_properties={"delta.enableTypeWidening": "true"}
)
def my_table():
return spark.readStream.table("source_table")
SQL
CREATE OR REFRESH STREAMING TABLE my_table
TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
AS SELECT * FROM source_table
与下游读取器的兼容性
启用了加宽类型的表只能在 Databricks Runtime 15.4 LTS 及更高版本中读取。 如果希望在管道中启用了类型加宽的表可以被 Databricks Runtime 14.3 或更低版本所读取,则必须:
- 通过删除属性
delta.enableTypeWidening/pipelines.enableTypeWidening或将其设置为 false 来禁用类型拓展,并触发表的完全刷新。 - 在表上启用 兼容性模式 。
Delta共享
注意
Databricks Runtime 16.1及更高版本支持Type Widening在Delta Sharing中的应用。
在 Databricks 的 Delta Sharing 中,支持共享启用了类型扩展的 Delta Lake 表。 提供程序和收件人必须位于 Databricks Runtime 16.1 或更高版本上。
要使用 Delta Sharing 从启用了类型扩展的 Delta Lake 表中读取变更数据馈送,必须将响应格式设置为 delta:
spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.option("readChangeFeed", "true")
.option("startingVersion", "<start version>")
.option("endingVersion", "<end version>")
.load("<table>")
不支持在类型更改过程中读取更改数据流。 您必须将操作拆分为两个单独的读取,一个在包含类型更改的表版本结束,另一个从包含类型更改的表版本开始。
局限性
Apache Iceberg 兼容性
Apache Iceberg 不支持类型扩大涵盖的所有类型更改,请参阅 Iceberg 架构演变。 具体而言,Azure Databricks 不支持以下类型更改:
-
byte、short、int、long到decimal或double - 小数位数增加
-
date至timestampNTZ
在 Delta Lake 表上启用与 Iceberg 兼容的 UniForm 时,应用这些类型更改之一将导致错误。 请参阅 使用 Iceberg 客户端读取 Delta 表。
如果将以下不受支持的类型更改之一应用于 Delta Lake 表,有两个选项:
重新生成 Iceberg 元数据:使用以下命令重新生成 Iceberg 元数据,而无需类型扩大表功能:
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.universalFormat.config.icebergCompatVersion' = '<version>')这样,就可以在应用不兼容的类型更改后保持统一兼容性。
删除类型扩大表功能:请参阅 “禁用类型扩大表”功能。
类型依赖函数
某些 SQL 函数返回依赖于输入数据类型的结果。 例如,如果参数类型不同, hash 则函数 返回相同逻辑值的不同哈希值: hash(1::INT) 返回的结果不同于 hash(1::BIGINT)。
其他类型依赖函数包括:xxhash64、、bit_getbit_reverse、 typeof。
对于使用这些函数的查询中的稳定结果,请显式将值强制转换为所需类型:
Python
spark.read.table("table_name") \
.selectExpr("hash(CAST(column_name AS BIGINT))")
Scala(编程语言)
spark.read.table("main.johan_lasperas.dlt_type_widening_bronze2")
.selectExpr("hash(CAST(a AS BIGINT))")
SQL
-- Use explicit casting for stable hash values
SELECT hash(CAST(column_name AS BIGINT)) FROM table_name
其他限制
- 当从具有类型更改的 Delta Lake 表流式传输时,不能使用 SQL 提供架构跟踪位置。
- 您不能使用 Delta Sharing 将启用类型拓宽的表共享给非 Databricks 消费者。