Compartir vía


Sobrescritura de datos de forma selectiva con Delta Lake

Azure Databricks saca provecho de la funcionalidad de Delta Lake para admitir dos opciones distintas para sobrescrituras selectivas:

  • La opción replaceWhere reemplaza de forma atómica todos los registros que coinciden con un predicado determinado.
  • Puede reemplazar directorios de datos en función de cómo se particionan las tablas con sobrescrituras de particiones dinámicas.

Para la mayoría de las operaciones, Databricks recomienda usar replaceWhere para especificar qué datos sobrescribir.

Importante

Si una partición se ha sobrescrito accidentalmente, puede usar restaurar para deshacer el cambio.

Sobrescritura selectiva arbitraria con replaceWhere

Puede sobrescribir selectivamente solo los datos que coincidan con una expresión arbitraria.

Nota:

SQL requiere Databricks Runtime 12.2 LTS o superior.

El comando siguiente reemplaza de forma atómica los eventos de enero en la tabla de destino, que está particionada por start_date, con los datos en replace_data:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Este código de ejemplo escribe los datos en replace_data, valida que todas las filas coincidan con el predicado y realiza un reemplazo atómico mediante semánticas overwrite. Si alguno de los valores de la operación se encuentra fuera de la restricción, esta operación produce un error de forma predeterminada.

Puede cambiar este comportamiento a valores overwrite dentro del intervalo de predicado y los registros insert que se encuentran fuera del intervalo especificado. Para ello, deshabilite la comprobación de restricciones estableciendo spark.databricks.delta.replaceWhere.constraintCheck.enabled en false con una de las siguientes opciones:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Comportamiento heredado

El comportamiento predeterminado heredado tenía que replaceWhere sobrescribiera los datos que coinciden con un predicado solo sobre las columnas de partición. Con este modelo heredado, el siguiente comando reemplazaría atómicamente el mes de enero en la tabla de destino, que es particionado por date, por los datos de df:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")

Si desea revertir al comportamiento anterior, puede deshabilitar la marca spark.databricks.delta.replaceWhere.dataColumns.enabled:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Sobrescritura de particiones dinámicas

Importante

Esta característica está en versión preliminar pública.

Databricks Runtime 11.3 LTS y superior soporta el modo de sobreescritura partición dinámica para tablas particionadas. En el caso de las tablas con varias particiones, Databricks Runtime 11.3 LTS y debajo solo admiten sobrescrituras de particiones dinámicas si todas las columnas de partición tienen el mismo tipo de datos.

En el modo de sobrescritura de partición dinámica, las operaciones sobrescriben todos los datos existentes en cada partición lógica para la que la escritura confirme los nuevos datos. Las particiones lógicas existentes para las que la escritura no contiene datos permanecerán sin cambios. Este modo solo es aplicable cuando los datos se escriben en modo de sobrescritura: ya sea INSERT OVERWRITE en SQL o una escritura de DataFrame con df.write.mode("overwrite").

Configure el modo de sobrescritura dinámica de particiones estableciendo la configuración de sesión de Spark spark.sql.sources.partitionOverwriteMode en dynamic. También puede habilitarlo estableciendo la opción DataFrameWriter de partitionOverwriteMode en dynamic. Si está presente, la opción específica de la consulta invalida el modo definido en la configuración de sesión. El valor predeterminado para partitionOverwriteMode es static.

Importante

Compruebe que los datos escritos con la sobrescritura dinámica de particiones tocan solo las particiones esperadas. Una sola fila en la partición incorrecta puede provocar que se sobrescriba accidentalmente una partición completa.

En el ejemplo siguiente se muestra el uso de sobrescrituras de partición dinámica:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Nota:

  • La sobrescritura dinámica de particiones entra en conflicto con la opción replaceWhere para las tablas con particiones.
    • Si la sobrescritura dinámica de particiones está habilitada en la configuración de sesión de Spark y replaceWhere se proporciona como opción de DataFrameWriter, Delta Lake sobrescribe los datos según la expresión replaceWhere (las opciones específicas de la consulta invalidan las configuraciones de sesión).
    • Recibirá un error si las opciones de DataFrameWriter tienen tanto la sobrescritura partición dinámica como replaceWhere habilitados.
  • No se puede especificar overwriteSchema como true cuando se usa la sobrescritura de partición dinámica.