Compartilhar via


Atualizar o esquema de tabela do Delta Lake

O Delta Lake permite atualizar o esquema de uma tabela. Há suporte aos seguintes tipos de alterações:

  • Adição de novas colunas (em posições arbitrárias)
  • Reordenação de colunas existentes
  • Renomeação de colunas existentes

Você pode fazer essas alterações explicitamente com DDL ou implicitamente com DML.

Importante

Uma atualização em um esquema de tabela Delta é uma operação que entra em conflito com todas as operações de gravação Delta simultâneas.

Quando você atualiza um esquema de tabela Delta, os fluxos que leem dessa tabela são terminados. Se você quiser que o fluxo continue, precisará reiniciá-lo. Para obter métodos recomendados, confira Considerações de produção para Streaming Estruturado.

Atualizar explicitamente o esquema para adicionar colunas

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Por padrão, a nulidade é true.

Para adicionar uma coluna a um campo aninhado, use:

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Por exemplo, se o esquema anterior à execução de ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) for:

- root
| - colA
| - colB
| +-field1
| +-field2

o esquema após será:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

Observação

A adição de colunas aninhadas tem suporte apenas para structs. Não há suporte a matrizes e mapas.

Atualizar explicitamente o esquema para alterar o comentário ou a ordenação da coluna

ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Para alterar uma coluna em um campo aninhado, use:

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Por exemplo, se o esquema anterior à execução de ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST for:

- root
| - colA
| - colB
| +-field1
| +-field2

o esquema após será:

- root
| - colA
| - colB
| +-field2
| +-field1

Atualizar explicitamente o esquema para substituir colunas

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

Por exemplo, ao executar a seguinte DDL:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

Se o esquema antes for:

- root
| - colA
| - colB
| +-field1
| +-field2

o esquema após será:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

Atualizar explicitamente o esquema para renomear colunas

Observação

Este recurso está disponível no Databricks Runtime 10.4 LTS e versões superiores.

Para renomear colunas sem regravar os dados existentes de uma coluna, você precisará habilitar o mapeamento de colunas para a tabela. Confira Renomear e remover colunas usando o mapeamento de colunas do Delta Lake.

Para renomear uma coluna:

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

Para renomear um campo aninhado:

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

Por exemplo, ao executar o seguinte comando:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

Se o esquema antes for:

- root
| - colA
| - colB
| +-field1
| +-field2

O esquema após será:

- root
| - colA
| - colB
| +-field001
| +-field2

Confira Renomear e remover colunas usando o mapeamento de colunas do Delta Lake.

Atualizar explicitamente o esquema para remover colunas

Observação

Este recurso está disponível no Databricks Runtime 11.3 LTS e versões superiores.

Para remover colunas como uma operação somente de metadados sem reescrever nenhum arquivo de dados, você precisa habilitar o mapeamento de colunas para a tabela. Confira Renomear e remover colunas usando o mapeamento de colunas do Delta Lake.

Importante

A remoção de uma coluna dos metadados não exclui os dados subjacentes da coluna nos arquivos. Para limpar os dados de coluna removidos, você pode usar REORG TABLE para reescrever arquivos. Depois, você pode usar VACUUM para excluir fisicamente os arquivos que contêm os dados da coluna removida.

Para remover uma coluna:

ALTER TABLE table_name DROP COLUMN col_name

Para remover várias colunas:

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

Atualizar explicitamente o esquema para alterar o tipo de coluna

Você pode alterar o tipo ou nome da coluna ou removê-la regravando a tabela. Para fazer isso, use a opção overwriteSchema.

O exemplo a seguir mostra a alteração de um tipo de coluna:

(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

O exemplo a seguir mostra a alteração de um nome de coluna:

(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

Habilitar a evolução do esquema

Você pode habilitar a evolução do esquema seguindo um destes procedimentos:

O Databricks recomenda habilitar a evolução do esquema para cada operação de gravação em vez de definir uma configuração do Spark.

Quando você usa opções ou sintaxe para habilitar a evolução do esquema em uma operação de gravação, isso tem precedência sobre a configuração do Spark.

Observação

Não há cláusula de evolução de esquema para instruções INSERT INTO.

Habilitar a evolução do esquema para gravações para adicionar novas colunas

As colunas que estão presentes na consulta de origem, mas ausentes da tabela de destino, são automaticamente adicionadas como parte de uma transação de gravação quando a evolução de esquema é habilitada. Consulte Habilitar a evolução do esquema.

As maiúsculas e minúsculas são preservadas no acréscimo de uma nova coluna. Novas colunas são adicionadas ao final do esquema de tabelas. Se as colunas adicionais estiverem em um struct, elas serão acrescentadas ao final do struct na tabela de destino.

O exemplo a seguir demonstra como usar a opção mergeSchema com o Carregador Automático. Confira O que é o Carregador Automático?.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

O exemplo a seguir demonstra o uso da opção mergeSchema com uma operação de gravação em lote:

(spark.read
  .table(source_table)
  .write
  .option("mergeSchema", "true")
  .mode("append")
  .saveAsTable("table_name")
)

Evolução automática de esquema para mesclagem do Delta Lake

A evolução do esquema permite que os usuários resolve incompatibilidades de esquema entre o destino e a tabela de origem na mesclagem. Ele lida com os dois casos a seguir:

  1. Uma coluna na tabela de origem não está presente na tabela de destino. A nova coluna é adicionada ao esquema de destino e seus valores são inseridos ou atualizados usando os valores de origem.
  2. Uma coluna na tabela de destino não está presente na tabela de origem. O esquema de destino permanece inalterado; os valores na coluna de destino adicional são deixados inalterados (para UPDATE) ou definidos como NULL (para INSERT).

Você deve habilitar manualmente a evolução automática do esquema. Consulte Habilitar a evolução do esquema.

Observação

No Databricks Runtime 12.2 LTS e posteriores, os campos e colunas struct presentes na tabela de origem podem ser especificados pelo nome em ações de inserção ou atualização. No Databricks Runtime 11.3 LTS e abaixo, somente INSERT * ou UPDATE SET * ações podem ser usadas para evolução do esquema com mesclagem.

No Databricks Runtime 13.3 LTS e posteriores, você pode usar a evolução do esquema com structs aninhados dentro de mapas, como map<int, struct<a: int, b: int>>.

Sintaxe de evolução de esquema para mesclagem

No Databricks Runtime 15.2 e superior, você pode especificar a evolução do esquema em uma instrução de mesclagem usando SQL ou APIs de tabela delta:

SQL

MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Python

from delta.tables import *

(targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

import io.delta.tables._

targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

Exemplo de operações de mesclagem com evolução de esquema

Aqui estão alguns exemplos dos efeitos da operação merge com e sem a evolução do esquema.

Colunas Consulta (no SQL) Comportamento sem a evolução do esquema (padrão) Comportamento com a evolução do esquema
Colunas de destino: key, value

Colunas de origem: key, value, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
O esquema de tabela permanece inalterado; somente as colunas key, value são atualizadas/inseridas. O esquema da tabela é alterado para (key, value, new_value). Os registros existentes com correspondências são atualizados com value e new_value na origem. Novas linhas são inseridas com o esquema (key, value, new_value).
Colunas de destino: key, old_value

Colunas de origem: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
UPDATE e INSERT as ações geram um erro porque a coluna old_value de destino não está na origem. O esquema da tabela é alterado para (key, old_value, new_value). Os registros existentes com correspondências são atualizados com o new_value na origem deixando old_value inalterado. Novos registros são inseridos com key, new_value e NULL especificados para o old_value.
Colunas de destino: key, old_value

Colunas de origem: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET new_value = s.new_value
UPDATE gera um erro porque a coluna new_value não existe na tabela de destino. O esquema da tabela é alterado para (key, old_value, new_value). Os registros existentes com correspondências são atualizados com o new_value na origem deixando old_value inalterado e registros não correspondentes foram NULL inseridos para new_value. Confira a observação (1).
Colunas de destino: key, old_value

Colunas de origem: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
THEN INSERT (key, new_value) VALUES (s.key, s.new_value)
INSERT gera um erro porque a coluna new_value não existe na tabela de destino. O esquema da tabela é alterado para (key, old_value, new_value). Novos registros são inseridos com key, new_value e NULL especificados para o old_value. Os registros existentes foram NULL inseridos para new_value deixando old_value inalterados. Confira a observação (1).

(1) Esse comportamento está disponível no Databricks Runtime 12.2 LTS e superior; Databricks Runtime 11.3 LTS e erro abaixo nessa condição.

Excluir colunas com mesclagem do Delta Lake

No Databricks Runtime 12.2 LTS e posteriores, você pode usar cláusulas EXCEPT em condições de mesclagem para excluir explicitamente colunas. O comportamento da palavra-chave EXCEPT varia dependendo se a evolução do esquema está habilitada ou não.

Com a evolução do esquema desabilitada, a palavra-chave EXCEPT se aplica à lista de colunas na tabela de destino e permite a exclusão de colunas de ações UPDATE ou INSERT. As colunas excluídas são definidas como null.

Com a evolução do esquema habilitada, a palavra-chave EXCEPT se aplica à lista de colunas na tabela de origem e permite a exclusão de colunas da evolução do esquema. Uma nova coluna na origem que não esteja presente no destino não será adicionada ao esquema de destino se estiver listada na cláusula EXCEPT. As colunas excluídas que já estão presentes no destino são definidas como null.

Os exemplos a seguir demonstram essa sintaxe:

Colunas Consulta (no SQL) Comportamento sem a evolução do esquema (padrão) Comportamento com a evolução do esquema
Colunas de destino: id, title, last_updated

Colunas de origem: id, title, review, last_updated
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated)
As linhas correspondentes são atualizadas definindo o campo last_updated como a data atual. Novas linhas são inseridas usando valores para id e title. O campo last_updated excluído é definido como null. O campo review é ignorado porque não está no destino. As linhas correspondentes são atualizadas definindo o campo last_updated como a data atual. O esquema é desenvolvido para adicionar o campo review. Novas linhas são inseridas usando todos os campos de origem, exceto last_updated, que é definida como null.
Colunas de destino: id, title, last_updated

Colunas de origem: id, title, review, internal_count
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated, internal_count)
INSERT gera um erro porque a coluna internal_count não existe na tabela de destino. As linhas correspondentes são atualizadas definindo o campo last_updated como a data atual. O campo review é adicionado à tabela de destino, mas o campo internal_count é ignorado. Novas linhas inseridas têm last_updated definida como null.

Como lidar com colunas NullType em atualizações de esquema

Como o Parquet não dá suporte a NullType, as colunas NullType são removidas do DataFrame quando gravadas em tabelas Delta, mas ainda ficam armazenadas no esquema. Quando um tipo de dados diferente é recebido para essa coluna, o Delta Lake mescla o esquema com o novo tipo de dados. Se o Delta Lake receber um NullType para uma coluna existente, o esquema antigo será mantido e a nova coluna será removida durante a gravação.

Não há suporte a streaming de NullType. Como você precisa definir esquemas ao usar streaming, isso deve acontecer muito raramente. NullType também não é aceito para tipos complexos como ArrayType e MapType.

Esquema da tabela de substituição

Por padrão, a substituição dos dados em uma tabela não substitui o esquema. Ao substituir uma tabela usando mode("overwrite") sem replaceWhere, talvez seja melhor substituir o esquema dos dados que estão sendo gravados. Você substitui o esquema e o particionamento da tabela com a definição da opção overwriteSchema como true:

df.write.option("overwriteSchema", "true")

Importante

Você não pode especificar overwriteSchema como true ao usar a substituição de partição dinâmica.