Partilhar via


Atualizar o esquema de tabelas do Delta Lake

O Delta Lake permite-lhe atualizar o esquema de uma tabela. São suportados os seguintes tipos de alterações:

  • Adicionar novas colunas (em posições arbitrárias)
  • Reordenar colunas existentes
  • Renomeando colunas existentes

Pode efetuar estas alterações explicitamente através de DDL ou implicitamente através de DML.

Importante

Uma atualização para um esquema de tabela Delta é uma operação que entra em conflito com todas as operações de gravação Delta simultâneas.

Quando actualiza um esquema de tabela Delta, os fluxos que lêem a partir desta tabela terminam. Se quiser que o fluxo continue, tem de reiniciá-lo. Para obter os métodos recomendados, consulte Considerações de produção para streaming estruturado.

Atualizar explicitamente o esquema para adicionar colunas

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Por padrão, a anulabilidade é true.

Para adicionar uma coluna a um campo aninhado, use:

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Por exemplo, se o esquema antes da execução ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) for:

- root
| - colA
| - colB
| +-field1
| +-field2

O esquema seguinte é:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

Nota

A adição de colunas aninhadas é suportada apenas para structs. Não há suporte para matrizes e mapas.

Atualizar explicitamente o esquema para alterar o comentário ou a ordenação das colunas

ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Para alterar uma coluna em um campo aninhado, use:

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Por exemplo, se o esquema antes da execução ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST for:

- root
| - colA
| - colB
| +-field1
| +-field2

O esquema seguinte é:

- root
| - colA
| - colB
| +-field2
| +-field1

Atualizar explicitamente o esquema para substituir colunas

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

Por exemplo, ao executar a seguinte DDL:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

se o esquema anterior for:

- root
| - colA
| - colB
| +-field1
| +-field2

O esquema seguinte é:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

Atualizar explicitamente o esquema para renomear colunas

Nota

Esse recurso está disponível no Databricks Runtime 10.4 LTS e superior.

Para renomear colunas sem reescrever nenhum dos dados existentes das colunas, você deve habilitar o mapeamento de colunas para a tabela. Consulte Renomear e soltar colunas com o mapeamento de colunas Delta Lake.

Para renomear uma coluna:

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

Para renomear um campo aninhado:

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

Por exemplo, quando você executa o seguinte comando:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

Se o esquema anterior for:

- root
| - colA
| - colB
| +-field1
| +-field2

Em seguida, o esquema seguinte é:

- root
| - colA
| - colB
| +-field001
| +-field2

Consulte Renomear e soltar colunas com o mapeamento de colunas Delta Lake.

Atualizar explicitamente o esquema para soltar colunas

Nota

Esse recurso está disponível no Databricks Runtime 11.3 LTS e superior.

Para soltar colunas como uma operação somente de metadados sem reescrever nenhum arquivo de dados, você deve habilitar o mapeamento de colunas para a tabela. Consulte Renomear e soltar colunas com o mapeamento de colunas Delta Lake.

Importante

Soltar uma coluna dos metadados não exclui os dados subjacentes da coluna nos arquivos. Para limpar os dados da coluna descartada, você pode usar REORG TABLE para reescrever arquivos. Em seguida, você pode usar VACUUM para excluir fisicamente os arquivos que contêm os dados da coluna descartada.

Para soltar uma coluna:

ALTER TABLE table_name DROP COLUMN col_name

Para soltar várias colunas:

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

Atualizar explicitamente o esquema para alterar o tipo ou o nome da coluna

Você pode alterar o tipo ou o nome de uma coluna ou soltá-la reescrevendo a tabela. Para fazer isso, use a overwriteSchema opção.

O exemplo a seguir mostra a alteração de um tipo de coluna:

(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

O exemplo a seguir mostra a alteração do nome de uma coluna:

(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

Habilitar a evolução do esquema

Você pode habilitar a evolução do esquema seguindo um destes procedimentos:

O Databricks recomenda habilitar a evolução do esquema para cada operação de gravação em vez de definir um conf do Spark.

Quando você usa opções ou sintaxe para habilitar a evolução do esquema em uma operação de gravação, isso tem precedência sobre a conf do Spark.

Nota

Não há cláusula de evolução de esquema para INSERT INTO instruções.

Habilitar a evolução do esquema para gravações para adicionar novas colunas

As colunas que estão presentes na consulta de origem, mas ausentes da tabela de destino, são adicionadas automaticamente como parte de uma transação de gravação quando a evolução do esquema está habilitada. Consulte Ativar evolução do esquema.

As maiúsculas e minúsculas são preservadas ao acrescentar uma nova coluna. Novas colunas são adicionadas ao final do esquema da tabela. Se as colunas adicionais estiverem em uma struct, elas serão acrescentadas ao final da struct na tabela de destino.

O exemplo a seguir demonstra o uso da opção com o mergeSchema Auto Loader. Consulte O que é Auto Loader?.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

O exemplo a seguir demonstra o uso da mergeSchema opção com uma operação de gravação em lote:

(spark.read
  .table(source_table)
  .write
  .option("mergeSchema", "true")
  .mode("append")
  .saveAsTable("table_name")
)

Evolução automática do esquema para a fusão Delta Lake

A evolução do esquema permite que os usuários resolvam incompatibilidades de esquema entre a tabela de destino e de origem na mesclagem. Trata dos dois casos seguintes:

  1. Uma coluna na tabela de origem não está presente na tabela de destino. A nova coluna é adicionada ao esquema de destino e seus valores são inseridos ou atualizados usando os valores de origem.
  2. Uma coluna na tabela de destino não está presente na tabela de origem. O esquema de destino é deixado inalterado; Os valores na coluna de destino adicional são mantidos inalterados (para UPDATE) ou definidos como NULL (para INSERT).

Você deve habilitar manualmente a evolução automática do esquema. Consulte Ativar evolução do esquema.

Nota

No Databricks Runtime 12.2 LTS e superior, as colunas e os campos struct presentes na tabela de origem podem ser especificados pelo nome nas ações de inserção ou atualização. No Databricks Runtime 11.3 LTS e inferior, apenas INSERT * ações ou UPDATE SET * podem ser usadas para evolução de esquema com mesclagem.

No Databricks Runtime 13.3 LTS e superior, você pode usar a evolução do esquema com estruturas aninhadas dentro de mapas, como map<int, struct<a: int, b: int>>.

Sintaxe de evolução do esquema para mesclagem

No Databricks Runtime 15.2 e superior, você pode especificar a evolução do esquema em uma instrução de mesclagem usando APIs de tabela SQL ou Delta:

SQL

MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Python

from delta.tables import *

(targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

import io.delta.tables._

targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

Exemplo de operações de mesclagem com evolução de esquema

Aqui estão alguns exemplos dos efeitos da operação com e sem evolução do merge esquema.

Colunas Consulta (em SQL) Comportamento sem evolução do esquema (padrão) Comportamento com evolução do esquema
Colunas de destino: key, value

Colunas de origem: key, value, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
O esquema da tabela permanece inalterado; apenas as colunas key, value são atualizadas/inseridas. O esquema da tabela é alterado para (key, value, new_value). Os registros existentes com correspondências são atualizados com o value e new_value na fonte. Novas linhas são inseridas com o esquema (key, value, new_value).
Colunas de destino: key, old_value

Colunas de origem: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
UPDATE e INSERT as ações geram um erro porque a coluna old_value de destino não está na origem. O esquema da tabela é alterado para (key, old_value, new_value). Os registros existentes com correspondências são atualizados com o new_value na fonte deixando old_value inalterado. Novos registros são inseridos com o especificado key, , e NULL para o old_valuenew_value.
Colunas de destino: key, old_value

Colunas de origem: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET new_value = s.new_value
UPDATE lança um erro porque a coluna new_value não existe na tabela de destino. O esquema da tabela é alterado para (key, old_value, new_value). Os registros existentes com correspondências são atualizados com o new_value na fonte deixando old_value inalterado, e registros incomparáveis foram NULL inseridos para new_value. Ver nota (1).
Colunas de destino: key, old_value

Colunas de origem: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
THEN INSERT (key, new_value) VALUES (s.key, s.new_value)
INSERT lança um erro porque a coluna new_value não existe na tabela de destino. O esquema da tabela é alterado para (key, old_value, new_value). Novos registros são inseridos com o especificado key, , e NULL para o old_valuenew_value. Os registos existentes foram NULL introduzidos para new_value permanecerem old_value inalterados. Ver nota (1).

(1) Esse comportamento está disponível no Databricks Runtime 12.2 LTS e superior; Databricks Runtime 11.3 LTS e abaixo erro nesta condição.

Excluir colunas com mesclagem Delta Lake

No Databricks Runtime 12.2 LTS e superior, você pode usar EXCEPT cláusulas em condições de mesclagem para excluir explicitamente colunas. O comportamento da palavra-chave varia dependendo se a EXCEPT evolução do esquema está habilitada ou não.

Com a evolução do esquema desativada, a EXCEPT palavra-chave se aplica à lista de colunas na tabela de destino e permite excluir colunas de UPDATE ou INSERT ações. As colunas excluídas são definidas como null.

Com a evolução do esquema habilitada, a EXCEPT palavra-chave se aplica à lista de colunas na tabela de origem e permite excluir colunas da evolução do esquema. Uma nova coluna na origem que não está presente no destino não é adicionada ao esquema de destino se estiver listada EXCEPT na cláusula. As colunas excluídas que já estão presentes no destino são definidas como null.

Os exemplos a seguir demonstram essa sintaxe:

Colunas Consulta (em SQL) Comportamento sem evolução do esquema (padrão) Comportamento com evolução do esquema
Colunas de destino: id, title, last_updated

Colunas de origem: id, title, review, last_updated
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated)
As linhas correspondentes são atualizadas definindo o last_updated campo para a data atual. Novas linhas são inseridas usando valores para id e title. O campo last_updated excluído está definido como null. O campo review é ignorado porque não está no alvo. As linhas correspondentes são atualizadas definindo o last_updated campo para a data atual. O esquema é evoluído para adicionar o campo review. Novas linhas são inseridas usando todos os campos de origem, exceto last_updated o que está definido como null.
Colunas de destino: id, title, last_updated

Colunas de origem: id, title, review, internal_count
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated, internal_count)
INSERT lança um erro porque a coluna internal_count não existe na tabela de destino. As linhas correspondentes são atualizadas definindo o last_updated campo para a data atual. O review campo é adicionado à tabela de destino, mas o internal_count campo é ignorado. As novas linhas inseridas foram last_updated definidas como null.

Lidando com NullType colunas em atualizações de esquema

Como o Parquet não oferece suporte a NullType, NullType as colunas são descartadas do DataFrame ao gravar em tabelas Delta, mas ainda são armazenadas no esquema. Quando um tipo de dados diferente é recebido para essa coluna, o Delta Lake mescla o esquema com o novo tipo de dados. Se o Delta Lake receber um NullType para uma coluna existente, o esquema antigo será mantido e a nova coluna será descartada durante a gravação.

NullType em streaming não é suportado. Como você deve definir esquemas ao usar o streaming, isso deve ser muito raro. NullType também não é aceite para tipos complexos como ArrayType e MapType.

Substituir esquemas de tabelas

Por padrão, a substituição dos dados em uma tabela não substitui o esquema. Ao substituir uma tabela usando mode("overwrite") sem replaceWhere, você ainda pode querer substituir o esquema dos dados que estão sendo gravados. Substitua o esquema e o particionamento da tabela definindo a overwriteSchema opção como true:

df.write.option("overwriteSchema", "true")

Importante

Não é possível especificar overwriteSchema como true ao usar a substituição de partição dinâmica.