Compartir a través de


Sobrescritura de datos de forma selectiva con Delta Lake

Azure Databricks saca provecho de la funcionalidad de Delta Lake para admitir dos opciones distintas para sobrescrituras selectivas:

  • La opción replaceWhere reemplaza de forma atómica todos los registros que coinciden con un predicado determinado.
  • Puede reemplazar directorios de datos en función de cómo se particionan las tablas con sobrescrituras de particiones dinámicas.

Para la mayoría de las operaciones, Databricks recomienda usar replaceWhere para especificar qué datos sobrescribir.

Important

Si los datos se han sobrescrito accidentalmente, puede usar la restauración para deshacer el cambio.

Sobrescritura selectiva arbitraria con replaceWhere

Puede sobrescribir selectivamente solo los datos que coincidan con una expresión arbitraria.

Note

SQL requiere Databricks Runtime 12.2 LTS o superior.

El comando siguiente reemplaza de forma atómica los eventos de enero en la tabla de destino, que está particionada por start_date, con los datos en replace_data:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Este código de ejemplo escribe los datos en replace_data, valida que todas las filas coincidan con el predicado y realiza un reemplazo atómico mediante semánticas overwrite. Si alguno de los valores de la operación se encuentra fuera de la restricción, esta operación produce un error de forma predeterminada.

Puede cambiar este comportamiento a valores overwrite dentro del intervalo de predicado y los registros insert que se encuentran fuera del intervalo especificado. Para ello, deshabilite la comprobación de restricciones estableciendo spark.databricks.delta.replaceWhere.constraintCheck.enabled en false con una de las siguientes opciones:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Comportamiento heredado

El comportamiento predeterminado heredado tenía que replaceWhere sobrescribiera los datos que coinciden con un predicado solo sobre las columnas de partición. Con este modelo heredado, el siguiente comando reemplazaría atómicamente el mes de enero en la tabla de destino, que es particionado por date, por los datos de df:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")

Si desea revertir al comportamiento anterior, puede deshabilitar la marca spark.databricks.delta.replaceWhere.dataColumns.enabled:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Sobrescritura de particiones dinámicas

Las actualizaciones de sobrescritura de particiones dinámicas solo afectan a las particiones para las que la escritura realiza nuevos datos. Sobrescribe todos los datos existentes en esas particiones y deja a otros sin cambios.

Azure Databricks admite dos enfoques:

  • REPLACE USING (recomendado): funciona en todos los tipos de proceso, incluidos los almacenes de SQL de Databricks, el proceso sin servidor y el proceso clásico. No requiere establecer una configuración de sesión de Spark.
  • partitionOverwriteMode (heredado): requiere cómputo clásico y configurar una sesión de Spark. No se admite en Databricks SQL o proceso sin servidor.

En las secciones siguientes se muestra cómo usar cada enfoque.

Sobrescribir particiones dinámicas con REPLACE USING

Databricks Runtime 16.3 y versiones posteriores admiten sobrescrituras de particiones dinámicas para tablas con particiones mediante REPLACE USING. Este método permite sobrescribir datos de forma selectiva en todos los tipos de proceso, sin necesidad de establecer una configuración de sesión de Spark. REPLACE USING habilita un comportamiento de reescritura atómica independiente del procesamiento que funciona en almacenes de SQL de Databricks, procesamiento sin servidor y procesamiento clásico.

REPLACE USING solo sobrescribe las particiones destinadas a los datos entrantes. Todas las demás particiones permanecen sin cambios.

En el ejemplo siguiente se muestra cómo usar la sobrescritura de particiones dinámicas con REPLACE USING. Actualmente, solo puede usar SQL, no Python o Scala. Para obtener más información, consulte en INSERT la referencia del lenguaje SQL.

INSERT INTO TABLE events
  REPLACE USING (event_id, start_date)
  SELECT * FROM source_data

Tenga en cuenta las siguientes restricciones y comportamientos para sobrescribir particiones dinámicas:

  • Debe especificar el conjunto completo de columnas de partición de la tabla en la USING cláusula .
  • Valide siempre que los datos escritos solo toquen las particiones esperadas. Una única fila en la partición incorrecta puede sobrescribir de manera involuntaria toda la partición.

Si necesita una lógica de coincidencia más personalizable que la que REPLACE USING admite, como tratar NULL los valores como iguales, use la complementaria REPLACE ON en su lugar. Consulte INSERT para obtener más información.

Sobrescritura de particiones dinámicas con partitionOverwriteMode (legado)

Important

Esta característica está en versión preliminar pública.

Databricks Runtime 11.3 LTS y versiones posteriores admite sobrescrituras de particiones dinámicas para tablas con particiones mediante el modo de sobrescritura: ya sea INSERT OVERWRITE en SQL o en una escritura de DataFrame con df.write.mode("overwrite"). Este tipo de sobrescritura solo está disponible para el proceso clásico, no para los almacenes de SQL de Databricks ni para el proceso sin servidor.

Configure el modo de sobrescritura dinámica de particiones estableciendo la configuración de sesión de Spark spark.sql.sources.partitionOverwriteMode en dynamic. Como alternativa, puede establecer la DataFrameWriter opción partitionOverwriteMode en dynamic. Si está presente, la opción específica de la consulta invalida el modo definido en la configuración de sesión. El valor predeterminado de spark.sql.sources.partitionOverwriteMode es static.

El ejemplo siguiente muestra cómo usar partitionOverwriteMode:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Tenga en cuenta las siguientes restricciones y comportamientos para partitionOverwriteMode:

  • No se puede establecer overwriteSchema a true.
  • No puede especificar ambos partitionOverwriteMode y replaceWhere en la misma operación DataFrameWriter.
  • Si especifica una replaceWhere condición mediante una DataFrameWriter opción, Delta Lake aplica esa condición para controlar qué datos se sobrescriben. Esta opción tiene prioridad sobre la partitionOverwriteMode configuración de nivel de sesión.
  • Valide siempre que los datos escritos solo toquen las particiones esperadas. Una única fila en la partición incorrecta puede sobrescribir de manera involuntaria toda la partición.