通过


使用 Delta Lake 列映射重命名和删除列

本页介绍了 Delta Lake 列映射如何通过仅更改元数据来将列标记为已删除或重命名,而无需重写数据文件。

Azure Databricks 支持 Delta Lake 表的列映射。 列映射功能允许通过仅仅更改元数据来将列标示为已删除或重命名,而无需重写数据文件。 列映射还允许在列名(如空格)中使用 Parquet 不允许的字符。 这样,便可以在不重命名列的情况下将 CSV 或 JSON 数据直接引入 Delta。

先决条件和限制

在启用列映射之前,请了解以下限制:

启用列映射

使用以下命令启用列映射:

ALTER TABLE <table-name> SET TBLPROPERTIES (
  'delta.columnMapping.mode' = 'name'
)

列映射需要以下 Delta 协议:

  • 阅读器 2.0 或更高
  • Writer 编辑器 5 版或更高版本

请参阅 Delta Lake 功能兼容性和协议

重命名列

注意

在 Databricks Runtime 10.4 LTS 及更高版本中可用。

为 Delta 表启用列映射后,你可以重命名列:

ALTER TABLE <table-name> RENAME COLUMN old_col_name TO new_col_name

有关更多示例,请参阅 更新表架构

删除列

注意

在 Databricks Runtime 11.3 LTS 及更高版本中可用。

为 Delta 表启用列映射后,你可删除一列或多列:

ALTER TABLE table_name DROP COLUMN col_name
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2, ...)

有关详细信息,请参阅 更新表架构

列名中支持的字符

为 Delta 表启用列映射后,可以在列名中包含空格和其中任何字符: ,;{}()\n\t=

删除列映射

可以使用以下命令从表中删除列映射:

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.columnMapping.mode' = 'none')

警告

删除列映射会重写所有数据文件,以将物理列名替换为逻辑名称。 此操作不支持行级或物理冲突解决机制。

并发写入操作将导致ConcurrentModificationException。 删除列映射之前:

  1. 暂停所有并发写入操作,包括流式处理作业和ETL管道。
  2. 禁用表上的预测优化
  3. 对于大型表,在低活动期间安排这个操作。

有关支持降级表协议的替代方法,请参阅 “禁用列映射”。

禁用列映射

在 Databricks Runtime 15.3 及更高版本中,可以使用 DROP FEATURE 命令删除列映射并降级表协议。 如果需要降级协议版本,以便与较旧的读取器兼容,请使用此方法而不是 删除列映射

重要

从表中删除列映射不会删除分区表的目录名称中使用的随机前缀。

请参阅删除 Delta Lake 表功能并降级表协议

列映射和流式处理

可以提供模式跟踪位置,以便在启用列映射的 Delta 表中进行流式处理。 这克服了非累加架构更改可能导致流中断的问题。

针对数据源的每个流式读取都必须指定自己的 schemaTrackingLocation。 指定的 schemaTrackingLocation 必须包含在为目标表的 checkpointLocation 指定的目录中,以便进行流式写入。 对于将多个源 Delta 表中的数据合并的流式传输工作负载,必须在 checkpointLocation 中为每个源表指定唯一目录。

在正在运行的作业上启用列映射

重要

若要在正在运行的流式处理作业上启用列映射,请执行以下操作:

  1. 停止作业
  2. 启用表上的列映射
  3. 重启作业(第一次重启 - 初始化列映射)
  4. 再次重启作业(第二次重启 - 启用架构更改)

任何进一步的架构更改(添加或删除列、更改列类型)也需要重启作业。

指定架构跟踪位置

以下示例演示如何通过列映射为从 Delta 表进行流式读取指定 schemaTrackingLocation

checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
  .option("schemaTrackingLocation", checkpoint_path)
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("output_table")
)

后续步骤