通过


使用 Delta Lake 选择性地覆盖数据

Delta Lake 具有以下不同的选择性覆盖选项:

  • replaceWhere 选项以原子方式替换与给定谓词匹配的所有记录。
  • 可以根据表的分区方式使用动态分区覆盖来替换数据目录。

对于大多数操作,Databricks 建议使用 replaceWhere 来指定要覆盖的数据。

Important

如果数据被意外覆盖,则可以使用 还原 来撤消更改。

使用 replaceWhere 进行的任意选择性覆盖

可以有选择性地只覆盖与任意表达式匹配的数据。

Note

SQL 需要 Databricks Runtime 12.2 LTS 或更高版本。

例如,将分区表start_date在一月份的事件以原子方式替换为replace_data中的数据。

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .saveAsTable("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .saveAsTable("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

此示例代码在 replace_data中写出数据,验证所有行是否与谓词匹配,并使用 overwrite 语义执行原子替换。 如果操作中的任何值都超出约束范围,则此操作默认失败,并显示错误。

可以将此行为更改为谓词范围内的 overwrite 值和指定范围外的 insert 记录。 为此,请使用以下设置之一将 spark.databricks.delta.replaceWhere.constraintCheck.enabled 设置为 false 来禁用约束检查:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Note

REPLACE WHERE 接受的 boolean_expression 有一些限制。 请参阅 INSERT SQL 语言参考。

遗留行为

如果使用replaceWhere的传统行为,查询将仅匹配分区列的谓词,并忽略那些在谓词范围之外的数据。 以下命令会以原子方式替换目标表中按 date 分区的一月数据为 df

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .saveAsTable("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .saveAsTable("people10m")

若要使用旧行为,请将 spark.databricks.delta.replaceWhere.dataColumns.enabled 标志设置为 false

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

动态分区覆盖

动态分区覆盖仅更新写入提交新数据的分区,并使其他分区保持不变。

Azure Databricks建议使用REPLACE USING来触发动态分区覆盖。 此模式适用于所有计算类型,包括 Databricks SQL 仓库、无服务器计算和经典计算,无需设置 Spark 会话配置。 请参阅具有REPLACE USING的动态分区覆盖。

partitionOverwriteMode 是动态分区覆盖的旧模式,需要使用经典计算并设置 Spark 会话配置。 Databricks SQL 或无服务器计算不支持它。 请参阅动态分区覆盖(partitionOverwriteMode旧版)

以下各节演示如何使用每个模式。

使用 REPLACE USING 进行的动态分区覆盖

Databricks Runtime 16.3 及更高版本支持使用 REPLACE USING 表的动态分区覆盖。 你可以选择性地覆盖所有计算类型的数据,而无需设置 Spark 会话配置。 REPLACE USING 启用独立于计算的原子覆盖行为,适用于 Databricks SQL 仓库、无服务器计算和传统计算。

REPLACE USING 仅覆盖传入数据目标的分区。 所有其他分区保持不变。

REPLACE USING 仅在 SQL 中受支持。 有关详细信息,请参阅 INSERT SQL 语言参考。

以下示例使用 REPLACE USING

INSERT INTO TABLE events
  REPLACE USING (event_id, start_date)
  SELECT * FROM source_data

使用动态分区覆盖时请记住以下约束和行为:

  • 在 Databricks Runtime 17.2 及更高版本中,支持分区表、未分区表和具有液体聚类分析的表。
  • 在 Databricks Runtime 16.3 到 17.1 之间,仅支持分区表,并且必须在 USING 子句中指定表的所有分区列。
  • 始终验证写入的数据是否仅涉及预期的分区。 错误分区中的单行可能会无意中覆盖整个分区。

如果您需要比REPLACE USING提供的更多可自定义的匹配逻辑,比如将NULL的值视为相等,那么可以改用互补的REPLACE ON。 有关详细信息,请参阅 INSERT

使用 partitionOverwriteMode 的动态分区覆盖(旧版)

Important

此功能目前以公共预览版提供。

Databricks Runtime 11.3 LTS 及更高版本支持使用覆盖模式对分区表进行动态分区覆盖:SQL 中的 INSERT OVERWRITE,或带有 df.write.mode("overwrite") 的 DataFrame 写入。 这种类型的覆盖仅适用于经典计算,而不适用于 Databricks SQL 仓库或无服务器计算。

警告

如果可能,请使用 INSERT OVERWRITE REPLACE USING 而不是分区覆盖 INSERT OVERWRITE PARTITIONspark.sql.sources.partitionOverwriteMode=dynamic。 分区重写在分区更改时可能会引用过时的数据。

若要使用动态分区覆盖模式,请将 Spark 会话配置 spark.sql.sources.partitionOverwriteMode 设置为 dynamic。 或者,可以将选项DataFrameWriter设置为 partitionOverwriteModedynamic。 如果存在,查询特定选项将覆盖会话配置中定义的模式。 spark.sql.sources.partitionOverwriteMode 的默认值为 static

下面的示例使用 partitionOverwriteMode

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

请牢记以下 partitionOverwriteMode 的限制和特性:

  • 您不能将 overwriteSchema 设置为 true
  • 不能同时在同一个partitionOverwriteMode操作中指定replaceWhereDataFrameWriter
  • 如果使用 replaceWhere 指定 DataFrameWriter,Delta Lake 会应用该条件来控制要覆盖的数据。 此选项优先于 partitionOverwriteMode 会话级配置。
  • 始终验证写入的数据是否仅涉及预期的分区。 错误分区中的单行可能会无意中覆盖整个分区。