Atualizar o esquema de tabela do Delta Lake
O Delta Lake permite atualizar o esquema de uma tabela. Há suporte aos seguintes tipos de alterações:
- Adição de novas colunas (em posições arbitrárias)
- Reordenação de colunas existentes
- Renomeação de colunas existentes
Você pode fazer essas alterações explicitamente com DDL ou implicitamente com DML.
Importante
Uma atualização em um esquema de tabela Delta é uma operação que entra em conflito com todas as operações de gravação Delta simultâneas.
Quando você atualiza um esquema de tabela Delta, os fluxos que leem dessa tabela são terminados. Se você quiser que o fluxo continue, precisará reiniciá-lo. Para obter métodos recomendados, confira Considerações de produção para Streaming Estruturado.
Atualizar explicitamente o esquema para adicionar colunas
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Por padrão, a nulidade é true
.
Para adicionar uma coluna a um campo aninhado, use:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Por exemplo, se o esquema anterior à execução de ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
for:
- root
| - colA
| - colB
| +-field1
| +-field2
o esquema após será:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Observação
A adição de colunas aninhadas tem suporte apenas para structs. Não há suporte a matrizes e mapas.
Atualizar explicitamente o esquema para alterar o comentário ou a ordenação da coluna
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Para alterar uma coluna em um campo aninhado, use:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Por exemplo, se o esquema anterior à execução de ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST
for:
- root
| - colA
| - colB
| +-field1
| +-field2
o esquema após será:
- root
| - colA
| - colB
| +-field2
| +-field1
Atualizar explicitamente o esquema para substituir colunas
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Por exemplo, ao executar a seguinte DDL:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
Se o esquema antes for:
- root
| - colA
| - colB
| +-field1
| +-field2
o esquema após será:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
Atualizar explicitamente o esquema para renomear colunas
Observação
Este recurso está disponível no Databricks Runtime 10.4 LTS e versões superiores.
Para renomear colunas sem regravar os dados existentes de uma coluna, você precisará habilitar o mapeamento de colunas para a tabela. Confira Renomear e remover colunas usando o mapeamento de colunas do Delta Lake.
Para renomear uma coluna:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
Para renomear um campo aninhado:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
Por exemplo, ao executar o seguinte comando:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
Se o esquema antes for:
- root
| - colA
| - colB
| +-field1
| +-field2
O esquema após será:
- root
| - colA
| - colB
| +-field001
| +-field2
Confira Renomear e remover colunas usando o mapeamento de colunas do Delta Lake.
Atualizar explicitamente o esquema para remover colunas
Observação
Este recurso está disponível no Databricks Runtime 11.3 LTS e versões superiores.
Para remover colunas como uma operação somente de metadados sem reescrever nenhum arquivo de dados, você precisa habilitar o mapeamento de colunas para a tabela. Confira Renomear e remover colunas usando o mapeamento de colunas do Delta Lake.
Importante
A remoção de uma coluna dos metadados não exclui os dados subjacentes da coluna nos arquivos. Para limpar os dados de coluna removidos, você pode usar REORG TABLE para reescrever arquivos. Depois, você pode usar VACUUM para excluir fisicamente os arquivos que contêm os dados da coluna removida.
Para remover uma coluna:
ALTER TABLE table_name DROP COLUMN col_name
Para remover várias colunas:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
Atualizar explicitamente o esquema para alterar o tipo de coluna
Você pode alterar o tipo ou nome da coluna ou removê-la regravando a tabela. Para fazer isso, use a opção overwriteSchema
.
O exemplo a seguir mostra a alteração de um tipo de coluna:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
O exemplo a seguir mostra a alteração de um nome de coluna:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Habilitar a evolução do esquema
Você pode habilitar a evolução do esquema seguindo um destes procedimentos:
- Defina o
.option("mergeSchema", "true")
como um DataFramewrite
ou operaçãowriteStream
do Spark. Consulte Habilitar a evolução do esquema para gravações para adicionar novas colunas. - Use a sintaxe
MERGE WITH SCHEMA EVOLUTION
. Confira Sintaxe de evolução de esquema para mesclagem. - Defina a configuração
spark.databricks.delta.schema.autoMerge.enabled
do Spark comotrue
para a SparkSession atual.
O Databricks recomenda habilitar a evolução do esquema para cada operação de gravação em vez de definir uma configuração do Spark.
Quando você usa opções ou sintaxe para habilitar a evolução do esquema em uma operação de gravação, isso tem precedência sobre a configuração do Spark.
Observação
Não há cláusula de evolução de esquema para instruções INSERT INTO
.
Habilitar a evolução do esquema para gravações para adicionar novas colunas
As colunas que estão presentes na consulta de origem, mas ausentes da tabela de destino, são automaticamente adicionadas como parte de uma transação de gravação quando a evolução de esquema é habilitada. Consulte Habilitar a evolução do esquema.
As maiúsculas e minúsculas são preservadas no acréscimo de uma nova coluna. Novas colunas são adicionadas ao final do esquema de tabelas. Se as colunas adicionais estiverem em um struct, elas serão acrescentadas ao final do struct na tabela de destino.
O exemplo a seguir demonstra como usar a opção mergeSchema
com o Carregador Automático. Confira O que é o Carregador Automático?.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
O exemplo a seguir demonstra o uso da opção mergeSchema
com uma operação de gravação em lote:
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
Evolução automática de esquema para mesclagem do Delta Lake
A evolução do esquema permite que os usuários resolve incompatibilidades de esquema entre o destino e a tabela de origem na mesclagem. Ele lida com os dois casos a seguir:
- Uma coluna na tabela de origem não está presente na tabela de destino. A nova coluna é adicionada ao esquema de destino e seus valores são inseridos ou atualizados usando os valores de origem.
- Uma coluna na tabela de destino não está presente na tabela de origem. O esquema de destino permanece inalterado; os valores na coluna de destino adicional são deixados inalterados (para
UPDATE
) ou definidos comoNULL
(paraINSERT
).
Você deve habilitar manualmente a evolução automática do esquema. Consulte Habilitar a evolução do esquema.
Observação
No Databricks Runtime 12.2 LTS e posteriores, os campos e colunas struct presentes na tabela de origem podem ser especificados pelo nome em ações de inserção ou atualização. No Databricks Runtime 11.3 LTS e abaixo, somente INSERT *
ou UPDATE SET *
ações podem ser usadas para evolução do esquema com mesclagem.
No Databricks Runtime 13.3 LTS e posteriores, você pode usar a evolução do esquema com structs aninhados dentro de mapas, como map<int, struct<a: int, b: int>>
.
Sintaxe de evolução de esquema para mesclagem
No Databricks Runtime 15.2 e superior, você pode especificar a evolução do esquema em uma instrução de mesclagem usando SQL ou APIs de tabela delta:
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
Exemplo de operações de mesclagem com evolução de esquema
Aqui estão alguns exemplos dos efeitos da operação merge
com e sem a evolução do esquema.
Colunas | Consulta (no SQL) | Comportamento sem a evolução do esquema (padrão) | Comportamento com a evolução do esquema |
---|---|---|---|
Colunas de destino: key, value Colunas de origem: key, value, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
O esquema de tabela permanece inalterado; somente as colunas key , value são atualizadas/inseridas. |
O esquema da tabela é alterado para (key, value, new_value) . Os registros existentes com correspondências são atualizados com value e new_value na origem. Novas linhas são inseridas com o esquema (key, value, new_value) . |
Colunas de destino: key, old_value Colunas de origem: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
UPDATE e INSERT as ações geram um erro porque a coluna old_value de destino não está na origem. |
O esquema da tabela é alterado para (key, old_value, new_value) . Os registros existentes com correspondências são atualizados com o new_value na origem deixando old_value inalterado. Novos registros são inseridos com key , new_value e NULL especificados para o old_value . |
Colunas de destino: key, old_value Colunas de origem: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_value |
UPDATE gera um erro porque a coluna new_value não existe na tabela de destino. |
O esquema da tabela é alterado para (key, old_value, new_value) . Os registros existentes com correspondências são atualizados com o new_value na origem deixando old_value inalterado e registros não correspondentes foram NULL inseridos para new_value . Confira a observação (1). |
Colunas de destino: key, old_value Colunas de origem: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) |
INSERT gera um erro porque a coluna new_value não existe na tabela de destino. |
O esquema da tabela é alterado para (key, old_value, new_value) . Novos registros são inseridos com key , new_value e NULL especificados para o old_value . Os registros existentes foram NULL inseridos para new_value deixando old_value inalterados. Confira a observação (1). |
(1) Esse comportamento está disponível no Databricks Runtime 12.2 LTS e superior; Databricks Runtime 11.3 LTS e erro abaixo nessa condição.
Excluir colunas com mesclagem do Delta Lake
No Databricks Runtime 12.2 LTS e posteriores, você pode usar cláusulas EXCEPT
em condições de mesclagem para excluir explicitamente colunas. O comportamento da palavra-chave EXCEPT
varia dependendo se a evolução do esquema está habilitada ou não.
Com a evolução do esquema desabilitada, a palavra-chave EXCEPT
se aplica à lista de colunas na tabela de destino e permite a exclusão de colunas de ações UPDATE
ou INSERT
. As colunas excluídas são definidas como null
.
Com a evolução do esquema habilitada, a palavra-chave EXCEPT
se aplica à lista de colunas na tabela de origem e permite a exclusão de colunas da evolução do esquema. Uma nova coluna na origem que não esteja presente no destino não será adicionada ao esquema de destino se estiver listada na cláusula EXCEPT
. As colunas excluídas que já estão presentes no destino são definidas como null
.
Os exemplos a seguir demonstram essa sintaxe:
Colunas | Consulta (no SQL) | Comportamento sem a evolução do esquema (padrão) | Comportamento com a evolução do esquema |
---|---|---|---|
Colunas de destino: id, title, last_updated Colunas de origem: id, title, review, last_updated |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
As linhas correspondentes são atualizadas definindo o campo last_updated como a data atual. Novas linhas são inseridas usando valores para id e title . O campo last_updated excluído é definido como null . O campo review é ignorado porque não está no destino. |
As linhas correspondentes são atualizadas definindo o campo last_updated como a data atual. O esquema é desenvolvido para adicionar o campo review . Novas linhas são inseridas usando todos os campos de origem, exceto last_updated , que é definida como null . |
Colunas de destino: id, title, last_updated Colunas de origem: id, title, review, internal_count |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT gera um erro porque a coluna internal_count não existe na tabela de destino. |
As linhas correspondentes são atualizadas definindo o campo last_updated como a data atual. O campo review é adicionado à tabela de destino, mas o campo internal_count é ignorado. Novas linhas inseridas têm last_updated definida como null . |
Como lidar com colunas NullType
em atualizações de esquema
Como o Parquet não dá suporte a NullType
, as colunas NullType
são removidas do DataFrame quando gravadas em tabelas Delta, mas ainda ficam armazenadas no esquema. Quando um tipo de dados diferente é recebido para essa coluna, o Delta Lake mescla o esquema com o novo tipo de dados. Se o Delta Lake receber um NullType
para uma coluna existente, o esquema antigo será mantido e a nova coluna será removida durante a gravação.
Não há suporte a streaming de NullType
. Como você precisa definir esquemas ao usar streaming, isso deve acontecer muito raramente. NullType
também não é aceito para tipos complexos como ArrayType
e MapType
.
Esquema da tabela de substituição
Por padrão, a substituição dos dados em uma tabela não substitui o esquema. Ao substituir uma tabela usando mode("overwrite")
sem replaceWhere
, talvez seja melhor substituir o esquema dos dados que estão sendo gravados. Você substitui o esquema e o particionamento da tabela com a definição da opção overwriteSchema
como true
:
df.write.option("overwriteSchema", "true")
Importante
Você não pode especificar overwriteSchema
como true
ao usar a substituição de partição dinâmica.