表支持架构演变,允许在数据要求发生变化时修改表结构。 支持下列类型的更改:
- 在任意位置添加新列
- 重新排列现有列
- 重命名现有列
使用 DDL 显式进行这些更改,或使用 DML 隐式进行这些更改。
重要
模式更新与所有并发写入发生冲突。 Databricks 建议协调架构更改,以避免写入冲突。
更新表架构会终止从该表读取的任何流。 若要继续处理,请使用 结构化流的生产注意事项中所述的方法重启流。
显式更新架构以添加列
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
默认情况下,Null 性为 true。
若要将列添加到嵌套字段,请使用:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
例如,如果运行 ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) 之前的架构为:
- root
| - colA
| - colB
| +-field1
| +-field2
则运行之后的架构为:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
注意
仅支持为结构添加嵌套列。 不支持数组和映射。
显式更新架构以更改列注释或排序
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
若要更改嵌套字段中的列,请使用:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
例如,如果运行 ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST 之前的架构为:
- root
| - colA
| - colB
| +-field1
| +-field2
调整后的架构为:
- root
| - colA
| - colB
| +-field2
| +-field1
显式更新架构以替换列
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
例如,运行以下 DDL 时:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
如果之前的模式是:
- root
| - colA
| - colB
| +-field1
| +-field2
其结果的架构为:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
显式更新架构以重命名列
注意
此功能在 Databricks Runtime 10.4 LTS 和更高版本中可用。
若要在不重写任何列的现有数据的情况下重命名列,必须启用表的列映射。 请参阅使用 Delta Lake 列映射重命名和删除列。
要重命名列,请执行以下步骤:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
重命名嵌套字段:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
例如,运行以下命令时:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
如果之前的方案为:
- root
| - colA
| - colB
| +-field1
| +-field2
之后的架构为:
- root
| - colA
| - colB
| +-field001
| +-field2
显式更新架构以删除列
注意
此功能在 Databricks Runtime 11.3 LTS 和更高版本中可用。
必须为表启用列映射,以便在无需重写任何数据文件的情况下通过元数据操作删除列。 请参阅使用 Delta Lake 列映射重命名和删除列。
重要
从元数据中删除列不会删除文件中该列的基础数据。 若要清除已删除的列数据,可以使用 REORG TABLE 重写文件。 然后,可以使用 VACUUM 以物理方式删除包含已删除的列数据的文件。
删除一列:
ALTER TABLE table_name DROP COLUMN col_name
删除多列:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
显式更新架构以更改列类型或名称
可以通过重写表来更改列的类型或名称或删除列。 为此,请使用 overwriteSchema 选项。
以下示例演示如何更改列类型:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
以下示例演示如何更改列名称:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
启用架构演变
使用以下方法之一启用架构演变:
-
设置
mergeSchema选项:适用于INSERT,批量写入或流式写入。 针对单个写入操作设置.option("mergeSchema", "true")。 -
使用
MERGE WITH SCHEMA EVOLUTION语法: 适用于MERGE语句。 在 SQL 语法中包括WITH SCHEMA EVOLUTION或者使用 Azure Databricks API 中的.withSchemaEvolution()。 -
设置 Spark 配置(旧版):将
spark.databricks.delta.schema.autoMerge.enabled设置为true以应用于整个 SparkSession。 不建议用于生产用途。
Databricks 建议为每次写入操作启用架构演变,使用 mergeSchema 选项或 WITH SCHEMA EVOLUTION 语法,而不是设置 Spark 配置。
使用选项或语法在写入操作中启用架构演变时,该设置优先于 Spark 配置。
注意
INSERT INTO 语句没有 WITH SCHEMA EVOLUTION 子句。 请改用此选项 mergeSchema 。 请参阅为写入操作启用架构演变以添加新列。
为写入操作启用架构演变以添加新列
启用架构演变时,源查询中存在但目标表中缺少的列会自动添加为写入事务的一部分。 请参阅启用架构演变。
追加新列时,会保留大小写。 新列将添加到表架构的末尾。 如果其他列位于结构中,它们将被追加到目标表中结构的末尾。
注意
与支持 SQL 语法的MERGE语句不同,INSERT语句使用mergeSchema选项来启用架构演变。 没有 WITH SCHEMA EVOLUTION 子句的 INSERT INTO 语句。
INSERT 使用 DataFrame API 进行模式演变
以下示例演示如何将 mergeSchema 选项与批处理写入操作配合使用:
Python
(spark.read
.table("source_table")
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("target_table")
)
Scala(编程语言)
spark.read
.table("source_table")
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("target_table")
INSERT 流式处理中的架构演变
以下示例演示如何将 mergeSchema 选项与自动加载程序配合使用。 请参阅什么是自动加载程序?。
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
合并的自动模式演变
通过架构演变,可以解决合并中目标表和源表之间的架构不匹配问题。 此功能可处理以下两种情况:
源表中存在列,但不存在目标表,并在插入或更新作分配中按名称指定。 存在
UPDATE SET *或INSERT *动作。该列将添加到目标架构,并且将从源中的相应列填充其值。
这仅适用于合并源中的列名称和结构与目标分配完全匹配时。
新列必须存在于源架构中。 在 action 子句中分配新列不会定义该列。
这些示例允许架构演变:
-- The column newcol is present in the source but not in the target. It will be added to the target. UPDATE SET target.newcol = source.newcol -- The field newfield doesn't exist in struct column somestruct of the target. It will be added to that struct column. UPDATE SET target.somestruct.newfield = source.somestruct.newfield -- The column newcol is present in the source but not in the target. -- It will be added to the target. UPDATE SET target.newcol = source.newcol + 1 -- Any columns and nested fields in the source that don't exist in target will be added to the target. UPDATE SET * INSERT *如果架构中
newcol不存在列source,则这些示例不会触发架构演变:UPDATE SET target.newcol = source.someothercol UPDATE SET target.newcol = source.x + source.y UPDATE SET target.newcol = source.output.newcol目标表中存在列,但不存在源表。
目标架构不会更改。 这些列:
保持不变
UPDATE SET *。设置为
NULL以用于INSERT *。如果在操作子句中被分配,仍可能显式修改。
例如:
UPDATE SET * -- The target columns that are not in the source are left unchanged. INSERT * -- The target columns that are not in the source are set to NULL. UPDATE SET target.onlyintarget = 5 -- The target column is explicitly updated. UPDATE SET target.onlyintarget = source.someothercol -- The target column is explicitly updated from some other source column.
必须手动启用自动架构演变。 请参阅启用架构演变。
注意
在 Databricks Runtime 12.2 LTS 及更高版本中,可以在插入或更新操作中按名称指定源表中的列和结构字段。 在 Databricks Runtime 11.3 LTS 及以下版本中,只有 INSERT * 或 UPDATE SET * 操作可用于通过合并进行架构演变。
在 Databricks Runtime 13.3 LTS 及更高版本中,可以将模式演变与在映射中嵌套的结构一起使用,例如 map<int, struct<a: int, b: int>>。
用于合并的架构演变语法
在 Databricks Runtime 15.4 LTS 及更高版本中,可以使用 SQL 或表 API 在合并语句中指定架构演变:
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala(编程语言)
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
合并与架构演变的示例操作
以下示例展示了在有架构演变和没有架构演变的情况下 merge 操作的效果。
| 列 | 查询(在 SQL 中) | 无架构演变的行为(默认值) | 有架构演变的行为 |
|---|---|---|---|
目标列:key, value源列: key, value, new_value |
MERGE INTO target_table tUSING source_table sON t.key = s.keyWHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT * |
表架构保持不变;仅更新/插入列 key、value。 |
表架构更改为 (key, value, new_value)。 使用源中的 value 和 new_value 更新具有匹配项的现有记录。 使用架构 (key, value, new_value) 插入新行。 |
目标列:key, old_value源列: key, new_value |
MERGE INTO target_table tUSING source_table sON t.key = s.keyWHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT * |
UPDATE 和 INSERT 操作会引发错误,因为目标列 old_value 不在源中。 |
表架构更改为 (key, old_value, new_value)。 具有匹配项的现有记录会使用源中的 new_value 进行更新,而 old_value 保持不变。 使用指定的 key、new_value 和 NULL,为 old_value 插入新记录。 |
目标列:key, old_value源列: key, new_value |
MERGE INTO target_table tUSING source_table sON t.key = s.keyWHEN MATCHED THEN UPDATE SET new_value = s.new_value |
UPDATE 引发错误,因为目标表中不存在列 new_value。 |
表架构更改为 (key, old_value, new_value)。 使用源中的 new_value 更新已有匹配项的记录,同时保持 old_value 不变。对于不匹配的记录,将 NULL 输入到 new_value。 参阅注释 (1)。 |
目标列:key, old_value源列: key, new_value |
MERGE INTO target_table tUSING source_table sON t.key = s.keyWHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) |
INSERT 引发错误,因为目标表中不存在列 new_value。 |
表架构更改为 (key, old_value, new_value)。 指定 key、new_value 和 NULL 用于插入新的 old_value 记录。 现有记录已为 new_value 输入 NULL,并且 old_value 保持不变。 参阅注释 (1)。 |
(1) 此行为在 Databricks Runtime 12.2 及更高版本中可用;Databricks Runtime 11.3 LTS 及以下版本在这种情况下会出错。
排除已合并的列
在 Databricks Runtime 12.2 LTS 及更高版本中,可以在合并条件中使用 EXCEPT 子句显式排除列。
EXCEPT 关键字的行为因是否启用架构演变而异。
禁用架构演变后,EXCEPT 关键字将应用于目标表中的列列表,并允许从 UPDATE 或 INSERT 操作中排除列。 被排除的列被设置为 null。
启用架构演变后,EXCEPT 关键字将应用于源表中的列列表,并允许从架构演变中排除列。 如果 EXCEPT 子句中列出了源中不存在的新列,则不会将其添加到目标架构中。 目标中已存在的被排除列将被设置为 null。
以下示例演示了这些语法:
| 列 | 查询(在 SQL 中) | 无架构演变的行为(默认值) | 架构演变中的行为 |
|---|---|---|---|
目标列:id, title, last_updated源列: id, title, review, last_updated |
MERGE INTO target tUSING source sON t.id = s.idWHEN MATCHED THEN UPDATE SET last_updated = current_date()WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
通过将 last_updated 字段设置为当前日期来更新匹配的行。 使用 id 和 title 的值插入新行。 排除的字段 last_updated 设置为 null。
review 字段被忽略,因为它不在目标中。 |
通过将 last_updated 字段设置为当前日期来更新匹配的行。 架构已演变为添加 review 字段。 插入新行时,使用所有源字段,除last_updated外,该字段将被设置为null。 |
目标列:id, title, last_updated源列: id, title, review, internal_count |
MERGE INTO target tUSING source sON t.id = s.idWHEN MATCHED THEN UPDATE SET last_updated = current_date()WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT 引发错误,因为目标表中不存在列 internal_count。 |
通过将 last_updated 字段设置为当前日期来更新匹配的行。
review 字段将添加到目标表,但忽略该 internal_count 字段。 插入的新行已将 last_updated 设置为 null。 |
使用 Spark 配置启用架构演变(旧版)
可以将 Spark 配置 spark.databricks.delta.schema.autoMerge.enabled 设置为 true 启用当前 SparkSession 中所有写入作的架构演变:
Python
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", True)
Scala(编程语言)
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", true)
SQL
SET spark.databricks.delta.schema.autoMerge.enabled=true
重要
不建议使用此方法进行生产使用。 而是为每个写入操作启用架构演变:
- 对于
INSERT和批处理/流式写入,请使用.option("mergeSchema", "true") - 对于
MERGE语句,请使用MERGE WITH SCHEMA EVOLUTION
设置会话范围的配置可能会导致多个操作中发生意外的架构更改,并使确定哪些操作会演变架构更加困难。
当您使用选项或语法在写入操作中启用架构演变时,这将优先于 Spark 配置。
处理架构更新中的 NullType 列
由于 Parquet 不支持 NullType, NullType 因此在写入表时会从 DataFrame 中删除列,但仍存储在架构中。 当收到该列的其他数据类型时,架构将合并到新的数据类型。 如果收到现有列的NullType,则会保留旧模式,新列将在写入时被删除。
不支持流式处理中的 NullType。 使用流式处理时必须设置架构,所以这种情况应该非常罕见。 对于复杂类型(例如 NullType 和 ArrayType),也不会接受 MapType。
替换表架构
默认情况下,覆盖表中的数据不会覆盖架构。 在不使用 mode("overwrite") 的情况下使用 replaceWhere 来覆盖表时,你可能还希望覆盖写入的数据的架构。 你可以通过将 overwriteSchema 选项设置为 true 来替换表的架构和分区:
df.write.option("overwriteSchema", "true")
重要
使用动态分区覆盖时,不能将 overwriteSchema 指定为 true。