本页介绍了 Delta Lake 列映射如何通过仅更改元数据来将列标记为已删除或重命名,而无需重写数据文件。
Azure Databricks 支持 Delta Lake 表的列映射。 列映射功能允许通过仅仅更改元数据来将列标示为已删除或重命名,而无需重写数据文件。 列映射还允许在列名(如空格)中使用 Parquet 不允许的字符。 这样,便可以在不重命名列的情况下将 CSV 或 JSON 数据直接引入 Delta。
先决条件和限制
在启用列映射之前,请了解以下限制:
- 启用了列映射的表只能在 Databricks Runtime 10.4 LTS 及更高版本中读取
- 启用列映射可能会中断:
- 依赖于目录名称来读取 Delta 表的旧工作负载。 具有列映射的分区表使用随机前缀,而不是分区目录的列名。 请参阅 Delta Lake 和 Parquet 是否共享分区策略?。
- 使用 Delta 更改数据馈送的下游操作。 请参阅 更改具有列映射的表的数据馈送限制。
- 从 Delta 表以流式方式读取数据,包括在 Lakeflow Spark Declarative Pipelines 中。 请参阅 列映射和流式处理。
启用列映射
使用以下命令启用列映射:
ALTER TABLE <table-name> SET TBLPROPERTIES (
'delta.columnMapping.mode' = 'name'
)
列映射需要以下 Delta 协议:
- 阅读器 2.0 或更高
- Writer 编辑器 5 版或更高版本
请参阅 Delta Lake 功能兼容性和协议。
重命名列
注意
在 Databricks Runtime 10.4 LTS 及更高版本中可用。
为 Delta 表启用列映射后,你可以重命名列:
ALTER TABLE <table-name> RENAME COLUMN old_col_name TO new_col_name
有关更多示例,请参阅 更新表架构。
删除列
注意
在 Databricks Runtime 11.3 LTS 及更高版本中可用。
为 Delta 表启用列映射后,你可删除一列或多列:
ALTER TABLE table_name DROP COLUMN col_name
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2, ...)
有关详细信息,请参阅 更新表架构。
列名中支持的字符
为 Delta 表启用列映射后,可以在列名中包含空格和其中任何字符: ,;{}()\n\t=
删除列映射
可以使用以下命令从表中删除列映射:
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.columnMapping.mode' = 'none')
警告
删除列映射会重写所有数据文件,以将物理列名替换为逻辑名称。 此操作不支持行级或物理冲突解决机制。
并发写入操作将导致ConcurrentModificationException。 删除列映射之前:
- 暂停所有并发写入操作,包括流式处理作业和ETL管道。
- 禁用表上的预测优化。
- 对于大型表,在低活动期间安排这个操作。
有关支持降级表协议的替代方法,请参阅 “禁用列映射”。
禁用列映射
在 Databricks Runtime 15.3 及更高版本中,可以使用 DROP FEATURE 命令删除列映射并降级表协议。 如果需要降级协议版本,以便与较旧的读取器兼容,请使用此方法而不是 删除列映射 。
重要
从表中删除列映射不会删除分区表的目录名称中使用的随机前缀。
列映射和流式处理
可以提供模式跟踪位置,以便在启用列映射的 Delta 表中进行流式处理。 这克服了非累加架构更改可能导致流中断的问题。
针对数据源的每个流式读取都必须指定自己的 schemaTrackingLocation。 指定的 schemaTrackingLocation 必须包含在为目标表的 checkpointLocation 指定的目录中,以便进行流式写入。 对于将多个源 Delta 表中的数据合并的流式传输工作负载,必须在 checkpointLocation 中为每个源表指定唯一目录。
在正在运行的作业上启用列映射
重要
若要在正在运行的流式处理作业上启用列映射,请执行以下操作:
- 停止作业
- 启用表上的列映射
- 重启作业(第一次重启 - 初始化列映射)
- 再次重启作业(第二次重启 - 启用架构更改)
任何进一步的架构更改(添加或删除列、更改列类型)也需要重启作业。
指定架构跟踪位置
以下示例演示如何通过列映射为从 Delta 表进行流式读取指定 schemaTrackingLocation:
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)