Actualización del esquema de tabla de Delta Lake
Delta Lake permite actualizar el esquema de una tabla. Se admiten los siguientes tipos de cambios:
- Adición de nuevas columnas (en posiciones arbitrarias)
- Reordenación de las columnas existentes
- Cambio de nombre de las columnas existentes
Puede realizar estos cambios explícitamente mediante el DDL o implícitamente mediante el DML.
Importante
Una actualización de un esquema de tabla Delta es una operación que entra en conflicto con todas las operaciones simultáneas de escritura Delta.
Cuando se actualiza el esquema de una tabla Delta, se finalizan las transmisiones que leen desde esa tabla. Si quiere que la transmisión continúe, debe reiniciarla. Para conocer los métodos más recomendables, consulte Consideraciones de producción para Structured Streaming.
Actualizar explícitamente el esquema para agregar columnas
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
De manera predeterminada, la nulabilidad es true
.
Para agregar una columna a un campo anidado, use:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Por ejemplo, si el esquema antes de ejecutarse ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
es:
- root
| - colA
| - colB
| +-field1
| +-field2
El esquema después es:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Nota:
La adición de columnas anidadas solo se admite para estructuras. No se admiten matrices ni mapas.
Actualizar explícitamente el esquema para cambiar el comentario o la ordenación de columnas
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Para cambiar una columna en un campo anidado, use:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Por ejemplo, si el esquema antes de ejecutarse ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST
es:
- root
| - colA
| - colB
| +-field1
| +-field2
El esquema después es:
- root
| - colA
| - colB
| +-field2
| +-field1
Actualizar explícitamente el esquema para reemplazar columnas
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Por ejemplo, al ejecutar el siguiente DDL:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
Si el esquema antes es:
- root
| - colA
| - colB
| +-field1
| +-field2
El esquema después es:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
Actualizar explícitamente el esquema para cambiar el nombre de las columnas
Nota:
Esta característica está disponible en Databricks Runtime 10.4 LTS y versiones posteriores.
Para cambiar el nombre de las columnas sin reescribir los datos existentes de las columnas, debe habilitar la asignación de columnas para la tabla. Consulte Cambio de nombre y eliminación de columnas con la asignación de columnas de Delta Lake.
Para cambiar el nombre de una columna:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
Para cambiar el nombre de un campo anidado:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
Por ejemplo, cuando ejecute el siguiente comando:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
Si el esquema antes es:
- root
| - colA
| - colB
| +-field1
| +-field2
El esquema después es:
- root
| - colA
| - colB
| +-field001
| +-field2
Consulte Cambio de nombre y eliminación de columnas con la asignación de columnas de Delta Lake.
Actualizar explícitamente el esquema para quitar columnas
Nota:
Esta característica está disponible en Databricks Runtime 11.3 LTS y versiones posteriores.
Para eliminar columnas como una operación de solo metadatos sin volver a escribir ningún archivo de datos, debe habilitar la asignación de columnas para la tabla. Consulte Cambio de nombre y eliminación de columnas con la asignación de columnas de Delta Lake.
Importante
La eliminación de una columna de los metadatos no elimina los datos subyacentes de la columna en los archivos. Para purgar los datos de la columna eliminada, puede usar REORG TABLE para reescribir los archivos. Después, puede usar VACUUM para eliminar físicamente los archivos que contienen los datos de la columna eliminada.
Para eliminar una columna:
ALTER TABLE table_name DROP COLUMN col_name
Para eliminar varias columnas:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
Actualizar explícitamente el esquema para cambiar el tipo o el nombre de columna
Puede cambiar el tipo o el nombre de una columna o anular una columna mediante la reescritura de la tabla. Para ello, use la opción overwriteSchema
.
En el ejemplo siguiente se muestra cómo cambiar un tipo de columna:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
En el ejemplo siguiente se muestra cómo cambiar un nombre de columna:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Habilitación de la evolución del esquema
Puede habilitar la evolución del esquema realizando una de las siguientes acciones:
- Establezca
.option("mergeSchema", "true")
en una trama de datoswrite
o una operaciónwriteStream
de Spark. Consulte Habilitación de la evolución del esquema para las escrituras para agregar nuevas columnas. - Use sintaxis
MERGE WITH SCHEMA EVOLUTION
. Consulte Sintaxis de evolución del esquema para combinar. - Establezca la configuración de Spark
spark.databricks.delta.schema.autoMerge.enabled
entrue
para la SparkSession actual.
Databricks recomienda habilitar la evolución del esquema para cada operación de escritura en lugar de establecer una configuración de Spark.
Cuando se usan opciones o sintaxis para habilitar la evolución del esquema en una operación de escritura, esto tiene prioridad sobre la configuración de Spark.
Nota:
No hay ninguna cláusula de evolución de esquema para instrucciones INSERT INTO
.
Habilitación de la evolución del esquema para las escrituras para agregar nuevas columnas
Las columnas que están presentes en la consulta de origen pero que faltan en la tabla objetivo se agregan automáticamente como parte de una transacción de escritura cuando la evolución del esquema está habilitada. Consulte Habilitación de la evolución del esquema.
Las mayúsculas y minúsculas se conservan al anexar una nueva columna. Las columnas nuevas se combinan al final del esquema de tabla. Si las columnas adicionales están en una estructura, se anexan al final de la estructura de la tabla de destino.
En el siguiente ejemplo se muestra cómo usar la opción mergeSchema
con Auto Loader. Consulte ¿Qué es Auto Loader?.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
En el ejemplo siguiente se muestra cómo usar la opción mergeSchema
con una operación de escritura por lotes:
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
Evolución automática del esquema para la combinación de Delta Lake
La evolución del esquema permite a los usuarios resolver errores de coincidencia de esquema entre las tablas de destino y de origen en combinación. Controla los dos casos siguientes:
- Una columna de la tabla de origen no está presente en la tabla de destino. La nueva columna se agrega al esquema de destino y sus valores se insertan o actualizan mediante los valores de origen.
- Una columna de la tabla de destino no está presente en la tabla de origen. El esquema de destino se deja sin cambios; Los valores de la columna de destino adicional se dejan sin cambios (para
UPDATE
) o se establecen enNULL
(paraINSERT
).
Debe habilitar manualmente la evolución automática del esquema. Consulte Habilitación de la evolución del esquema.
Nota:
En Databricks Runtime 12.2 LTS y versiones posteriores, los campos de estructura y columnas presentes en la tabla de origen se pueden especificar por nombre en las acciones de inserción o actualización. En Databricks Runtime 11.3 LTS y versiones posteriores, solo se pueden usar acciones INSERT *
o UPDATE SET *
para la evolución del esquema con combinación.
En Databricks Runtime 13.3 LTS y versiones posteriores, puede usar la evolución del esquema con estructuras anidadas dentro de mapas, como map<int, struct<a: int, b: int>>
.
Sintaxis de evolución del esquema para fusión mediante combinación
En Databricks Runtime 15.2 y versiones posteriores, puede especificar la evolución del esquema en una instrucción de fusión mediante combinación usando SQL o las API de tabla Delta:
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
Operaciones de ejemplo de combinación con la evolución del esquema
Estos son algunos ejemplos de los efectos de la operación merge
con y sin evolución del esquema.
Columnas | Consulta (en SQL) | Comportamiento sin evolución del esquema (valor predeterminado) | Comportamiento con evolución del esquema |
---|---|---|---|
Columnas de destino: key, value Columnas de origen: key, value, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
El esquema de tabla permanece sin cambios; solo se actualizan o insertan las columnas key y value . |
El esquema de tabla se cambia a (key, value, new_value) . Los registros existentes con coincidencias se actualizan con value y new_value en el origen. Las filas nuevas se insertan con el esquema (key, value, new_value) . |
Columnas de destino: key, old_value Columnas de origen: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
Las acciones UPDATE y INSERT inician un error porque la columna de destino old_value no está en el origen. |
El esquema de tabla se cambia a (key, old_value, new_value) . Los registros existentes con coincidencias se actualizan con new_value en el origen y se deja old_value sin cambios. Los nuevos registros se insertan con los valores key , new_value y NULL especificados para old_value . |
Columnas de destino: key, old_value Columnas de origen: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_value |
UPDATE genera un error porque la columna new_value no existe en la tabla de destino. |
El esquema de tabla se cambia a (key, old_value, new_value) . Los registros existentes con coincidencias se actualizan con new_value en el origen y se deja old_value sin cambios. En los registros no coincidentes se especifica NULL para new_value . Consulte la nota (1). |
Columnas de destino: key, old_value Columnas de origen: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) |
INSERT genera un error porque la columna new_value no existe en la tabla de destino. |
El esquema de tabla se cambia a (key, old_value, new_value) . Los nuevos registros se insertan con los valores key , new_value y NULL especificados para old_value . En los registros existentes se especifica NULL para new_value y se deja old_value sin cambios. Consulte la nota (1). |
(1) Este comportamiento está disponible en Databricks Runtime 12.2 LTS y versiones posteriores; Databricks Runtime 11.3 LTS y un error inferior en esta condición.
Excluir columnas con combinación de Delta Lake
En Databricks Runtime 12.2 LTS y versiones posteriores, puede usar cláusulas EXCEPT
en condiciones de combinación para excluir explícitamente columnas. El comportamiento de la palabra clave EXCEPT
varía en función de si está habilitada o no la evolución del esquema.
Con la evolución del esquema deshabilitada, la palabra clave EXCEPT
se aplica a la lista de columnas de la tabla de destino y permite excluir columnas de acciones UPDATE
o INSERT
. Las columnas excluidas se establecen en null
.
Con la evolución del esquema habilitada, la palabra clave EXCEPT
se aplica a la lista de columnas de la tabla de origen y permite excluir columnas de la evolución del esquema. Una nueva columna del origen que no está presente en el destino no se agrega al esquema de destino si aparece en la cláusula EXCEPT
. Las columnas excluidas que ya están presentes en el destino se establecen en null
.
En los ejemplos siguientes se muestra esta sintaxis:
Columnas | Consulta (en SQL) | Comportamiento sin evolución del esquema (valor predeterminado) | Comportamiento con evolución del esquema |
---|---|---|---|
Columnas de destino: id, title, last_updated Columnas de origen: id, title, review, last_updated |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
Las filas coincidentes se actualizan estableciendo el campo last_updated en la fecha actual. Las filas nuevas se insertan mediante valores de id y title . El campo last_updated excluido se establece en null . El campo review se omite porque no está en el destino. |
Las filas coincidentes se actualizan estableciendo el campo last_updated en la fecha actual. El esquema ha evolucionado para agregar el campo review . Las filas nuevas se insertan con todos los campos de origen, excepto last_updated que se establece en null . |
Columnas de destino: id, title, last_updated Columnas de origen: id, title, review, internal_count |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT genera un error porque la columna internal_count no existe en la tabla de destino. |
Las filas coincidentes se actualizan estableciendo el campo last_updated en la fecha actual. El campo review se agrega a la tabla de destino, pero se omite el campo internal_count . Las filas nuevas insertadas tienen last_updated establecido en null . |
Trabajar con columnas NullType
en actualizaciones de esquemas
Dado que Parquet no admite NullType
, las columnas NullType
se anulan en el DataFrame al escribir en tablas Delta, pero se siguen almacenando en el esquema. Cuando se recibe un tipo de datos diferente para esa columna, Delta Lake combina el esquema con el nuevo tipo de datos. Si Delta Lake recibe un parámetro NullType
para una columna existente, se conserva el esquema antiguo y se anula la nueva columna durante la escritura.
No se admite NullType
en streaming. Como debe establecer esquemas al usar el streaming, esto debería ser muy poco frecuente. NullType
tampoco se acepta para tipos complejos, como ArrayType
y MapType
.
Reemplazo de esquema de tabla
De manera predeterminada, la sobrescritura de los datos en una tabla no sobrescribe el esquema. Al sobrescribir una tabla mediante mode("overwrite")
sin replaceWhere
, es posible que quiera sobrescribir el esquema de los datos que se escriben. Establezca la opción overwriteSchema
en true
para reemplazar el esquema y la creación de particiones de la tabla:
df.write.option("overwriteSchema", "true")
Importante
No se puede especificar overwriteSchema
como true
cuando se usa la sobrescritura de partición dinámica.