Compartir vía


Actualización del esquema de tabla de Delta Lake

Delta Lake permite actualizar el esquema de una tabla. Se admiten los siguientes tipos de cambios:

  • Adición de nuevas columnas (en posiciones arbitrarias)
  • Reordenación de las columnas existentes
  • Cambio de nombre de las columnas existentes

Puede realizar estos cambios explícitamente mediante el DDL o implícitamente mediante el DML.

Importante

Una actualización de un esquema de tabla Delta es una operación que entra en conflicto con todas las operaciones simultáneas de escritura Delta.

Cuando se actualiza el esquema de una tabla Delta, se finalizan las transmisiones que leen desde esa tabla. Si quiere que la transmisión continúe, debe reiniciarla. Para conocer los métodos más recomendables, consulte Consideraciones de producción para Structured Streaming.

Actualizar explícitamente el esquema para agregar columnas

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

De manera predeterminada, la nulabilidad es true.

Para agregar una columna a un campo anidado, use:

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Por ejemplo, si el esquema antes de ejecutarse ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) es:

- root
| - colA
| - colB
| +-field1
| +-field2

El esquema después es:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

Nota:

La adición de columnas anidadas solo se admite para estructuras. No se admiten matrices ni mapas.

Actualizar explícitamente el esquema para cambiar el comentario o la ordenación de columnas

ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Para cambiar una columna en un campo anidado, use:

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Por ejemplo, si el esquema antes de ejecutarse ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST es:

- root
| - colA
| - colB
| +-field1
| +-field2

El esquema después es:

- root
| - colA
| - colB
| +-field2
| +-field1

Actualizar explícitamente el esquema para reemplazar columnas

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

Por ejemplo, al ejecutar el siguiente DDL:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

Si el esquema antes es:

- root
| - colA
| - colB
| +-field1
| +-field2

El esquema después es:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

Actualizar explícitamente el esquema para cambiar el nombre de las columnas

Nota:

Esta característica está disponible en Databricks Runtime 10.4 LTS y versiones posteriores.

Para cambiar el nombre de las columnas sin reescribir los datos existentes de las columnas, debe habilitar la asignación de columnas para la tabla. Consulte Cambio de nombre y eliminación de columnas con la asignación de columnas de Delta Lake.

Para cambiar el nombre de una columna:

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

Para cambiar el nombre de un campo anidado:

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

Por ejemplo, cuando ejecute el siguiente comando:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

Si el esquema antes es:

- root
| - colA
| - colB
| +-field1
| +-field2

El esquema después es:

- root
| - colA
| - colB
| +-field001
| +-field2

Consulte Cambio de nombre y eliminación de columnas con la asignación de columnas de Delta Lake.

Actualizar explícitamente el esquema para quitar columnas

Nota:

Esta característica está disponible en Databricks Runtime 11.3 LTS y versiones posteriores.

Para eliminar columnas como una operación de solo metadatos sin volver a escribir ningún archivo de datos, debe habilitar la asignación de columnas para la tabla. Consulte Cambio de nombre y eliminación de columnas con la asignación de columnas de Delta Lake.

Importante

La eliminación de una columna de los metadatos no elimina los datos subyacentes de la columna en los archivos. Para purgar los datos de la columna eliminada, puede usar REORG TABLE para reescribir los archivos. Después, puede usar VACUUM para eliminar físicamente los archivos que contienen los datos de la columna eliminada.

Para eliminar una columna:

ALTER TABLE table_name DROP COLUMN col_name

Para eliminar varias columnas:

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

Actualizar explícitamente el esquema para cambiar el tipo o el nombre de columna

Puede cambiar el tipo o el nombre de una columna o anular una columna mediante la reescritura de la tabla. Para ello, use la opción overwriteSchema.

En el ejemplo siguiente se muestra cómo cambiar un tipo de columna:

(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

En el ejemplo siguiente se muestra cómo cambiar un nombre de columna:

(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

Habilitación de la evolución del esquema

Puede habilitar la evolución del esquema realizando una de las siguientes acciones:

Databricks recomienda habilitar la evolución del esquema para cada operación de escritura en lugar de establecer una configuración de Spark.

Cuando se usan opciones o sintaxis para habilitar la evolución del esquema en una operación de escritura, esto tiene prioridad sobre la configuración de Spark.

Nota:

No hay ninguna cláusula de evolución de esquema para instrucciones INSERT INTO.

Habilitación de la evolución del esquema para las escrituras para agregar nuevas columnas

Las columnas que están presentes en la consulta de origen pero que faltan en la tabla objetivo se agregan automáticamente como parte de una transacción de escritura cuando la evolución del esquema está habilitada. Consulte Habilitación de la evolución del esquema.

Las mayúsculas y minúsculas se conservan al anexar una nueva columna. Las columnas nuevas se combinan al final del esquema de tabla. Si las columnas adicionales están en una estructura, se anexan al final de la estructura de la tabla de destino.

En el siguiente ejemplo se muestra cómo usar la opción mergeSchema con Auto Loader. Consulte ¿Qué es Auto Loader?.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

En el ejemplo siguiente se muestra cómo usar la opción mergeSchema con una operación de escritura por lotes:

(spark.read
  .table(source_table)
  .write
  .option("mergeSchema", "true")
  .mode("append")
  .saveAsTable("table_name")
)

Evolución automática del esquema para la combinación de Delta Lake

La evolución del esquema permite a los usuarios resolver errores de coincidencia de esquema entre las tablas de destino y de origen en combinación. Controla los dos casos siguientes:

  1. Una columna de la tabla de origen no está presente en la tabla de destino. La nueva columna se agrega al esquema de destino y sus valores se insertan o actualizan mediante los valores de origen.
  2. Una columna de la tabla de destino no está presente en la tabla de origen. El esquema de destino se deja sin cambios; Los valores de la columna de destino adicional se dejan sin cambios (para UPDATE) o se establecen en NULL (para INSERT).

Debe habilitar manualmente la evolución automática del esquema. Consulte Habilitación de la evolución del esquema.

Nota:

En Databricks Runtime 12.2 LTS y versiones posteriores, los campos de estructura y columnas presentes en la tabla de origen se pueden especificar por nombre en las acciones de inserción o actualización. En Databricks Runtime 11.3 LTS y versiones posteriores, solo se pueden usar acciones INSERT * o UPDATE SET * para la evolución del esquema con combinación.

En Databricks Runtime 13.3 LTS y versiones posteriores, puede usar la evolución del esquema con estructuras anidadas dentro de mapas, como map<int, struct<a: int, b: int>>.

Sintaxis de evolución del esquema para fusión mediante combinación

En Databricks Runtime 15.2 y versiones posteriores, puede especificar la evolución del esquema en una instrucción de fusión mediante combinación usando SQL o las API de tabla Delta:

SQL

MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Python

from delta.tables import *

(targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

import io.delta.tables._

targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

Operaciones de ejemplo de combinación con la evolución del esquema

Estos son algunos ejemplos de los efectos de la operación merge con y sin evolución del esquema.

Columnas Consulta (en SQL) Comportamiento sin evolución del esquema (valor predeterminado) Comportamiento con evolución del esquema
Columnas de destino: key, value

Columnas de origen: key, value, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
El esquema de tabla permanece sin cambios; solo se actualizan o insertan las columnas key y value. El esquema de tabla se cambia a (key, value, new_value). Los registros existentes con coincidencias se actualizan con value y new_value en el origen. Las filas nuevas se insertan con el esquema (key, value, new_value).
Columnas de destino: key, old_value

Columnas de origen: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
Las acciones UPDATE y INSERT inician un error porque la columna de destino old_value no está en el origen. El esquema de tabla se cambia a (key, old_value, new_value). Los registros existentes con coincidencias se actualizan con new_value en el origen y se deja old_value sin cambios. Los nuevos registros se insertan con los valores key, new_value y NULL especificados para old_value.
Columnas de destino: key, old_value

Columnas de origen: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET new_value = s.new_value
UPDATE genera un error porque la columna new_value no existe en la tabla de destino. El esquema de tabla se cambia a (key, old_value, new_value). Los registros existentes con coincidencias se actualizan con new_value en el origen y se deja old_value sin cambios. En los registros no coincidentes se especifica NULL para new_value. Consulte la nota (1).
Columnas de destino: key, old_value

Columnas de origen: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
THEN INSERT (key, new_value) VALUES (s.key, s.new_value)
INSERT genera un error porque la columna new_value no existe en la tabla de destino. El esquema de tabla se cambia a (key, old_value, new_value). Los nuevos registros se insertan con los valores key, new_value y NULL especificados para old_value. En los registros existentes se especifica NULL para new_value y se deja old_value sin cambios. Consulte la nota (1).

(1) Este comportamiento está disponible en Databricks Runtime 12.2 LTS y versiones posteriores; Databricks Runtime 11.3 LTS y un error inferior en esta condición.

Excluir columnas con combinación de Delta Lake

En Databricks Runtime 12.2 LTS y versiones posteriores, puede usar cláusulas EXCEPT en condiciones de combinación para excluir explícitamente columnas. El comportamiento de la palabra clave EXCEPT varía en función de si está habilitada o no la evolución del esquema.

Con la evolución del esquema deshabilitada, la palabra clave EXCEPT se aplica a la lista de columnas de la tabla de destino y permite excluir columnas de acciones UPDATE o INSERT. Las columnas excluidas se establecen en null.

Con la evolución del esquema habilitada, la palabra clave EXCEPT se aplica a la lista de columnas de la tabla de origen y permite excluir columnas de la evolución del esquema. Una nueva columna del origen que no está presente en el destino no se agrega al esquema de destino si aparece en la cláusula EXCEPT. Las columnas excluidas que ya están presentes en el destino se establecen en null.

En los ejemplos siguientes se muestra esta sintaxis:

Columnas Consulta (en SQL) Comportamiento sin evolución del esquema (valor predeterminado) Comportamiento con evolución del esquema
Columnas de destino: id, title, last_updated

Columnas de origen: id, title, review, last_updated
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated)
Las filas coincidentes se actualizan estableciendo el campo last_updated en la fecha actual. Las filas nuevas se insertan mediante valores de id y title. El campo last_updated excluido se establece en null. El campo review se omite porque no está en el destino. Las filas coincidentes se actualizan estableciendo el campo last_updated en la fecha actual. El esquema ha evolucionado para agregar el campo review. Las filas nuevas se insertan con todos los campos de origen, excepto last_updated que se establece en null.
Columnas de destino: id, title, last_updated

Columnas de origen: id, title, review, internal_count
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated, internal_count)
INSERT genera un error porque la columna internal_count no existe en la tabla de destino. Las filas coincidentes se actualizan estableciendo el campo last_updated en la fecha actual. El campo review se agrega a la tabla de destino, pero se omite el campo internal_count. Las filas nuevas insertadas tienen last_updated establecido en null.

Trabajar con columnas NullType en actualizaciones de esquemas

Dado que Parquet no admite NullType, las columnas NullType se anulan en el DataFrame al escribir en tablas Delta, pero se siguen almacenando en el esquema. Cuando se recibe un tipo de datos diferente para esa columna, Delta Lake combina el esquema con el nuevo tipo de datos. Si Delta Lake recibe un parámetro NullType para una columna existente, se conserva el esquema antiguo y se anula la nueva columna durante la escritura.

No se admite NullType en streaming. Como debe establecer esquemas al usar el streaming, esto debería ser muy poco frecuente. NullType tampoco se acepta para tipos complejos, como ArrayType y MapType.

Reemplazo de esquema de tabla

De manera predeterminada, la sobrescritura de los datos en una tabla no sobrescribe el esquema. Al sobrescribir una tabla mediante mode("overwrite") sin replaceWhere, es posible que quiera sobrescribir el esquema de los datos que se escriben. Establezca la opción overwriteSchema en true para reemplazar el esquema y la creación de particiones de la tabla:

df.write.option("overwriteSchema", "true")

Importante

No se puede especificar overwriteSchema como true cuando se usa la sobrescritura de partición dinámica.