Substituir dados seletivamente com o Delta Lake

O Azure Databricks aproveita a funcionalidade do Delta Lake para dar suporte a duas opções distintas para substituições seletivas:

  • A opção replaceWhere substitui atomicamente todos os registros que correspondem a um determinado predicado.
  • Você pode substituir diretórios de dados com base em como as tabelas são particionadas usando substituições de partição dinâmica.

Para a maioria das operações, o Databricks recomenda usar replaceWhere para especificar quais dados substituir.

Importante

Se os dados forem substituídos acidentalmente, você pode restaurar para desfazer a alteração.

Substituição seletiva arbitrária com replaceWhere

Você pode substituir seletivamente apenas os dados que correspondem a uma expressão arbitrária.

Observação

O SQL requer o Databricks Runtime 12.2 LTS ou superior.

O comando a seguir substitui atomicamente os eventos em janeiro na tabela de destino, que é particionada por start_date, com dados em replace_data:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Este código de exemplo grava os dados em replace_data, valida se todas as linhas correspondem ao predicado e executa uma substituição atômica usando a semântica overwrite. Se algum valor na operação estiver fora da restrição, ocorrerá uma falha nessa operação com um erro por padrão.

Você pode alterar esse comportamento para valores overwrite dentro do intervalo do predicado e os registros insert que ficam fora do intervalo especificado. Para fazer isso, desabilite a verificação de restrição definindo spark.databricks.delta.replaceWhere.constraintCheck.enabled como false usando uma das seguintes configurações:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Comportamento herdado

O comportamento padrão herdado fez com que replaceWhere sobrescrevesse os dados correspondentes a um predicado apenas sobre colunas de partição. Com esse modelo herdado, o comando a seguir substituiria atomicamente o mês de janeiro na tabela de destino, que é particionada por date, pelos dados em df:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")

Se quiser voltar ao comportamento antigo, você poderá desabilitar o sinalizador spark.databricks.delta.replaceWhere.dataColumns.enabled:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Substituições de partições dinâmicas

Importante

Esse recurso está em uma versão prévia.

O Databricks Runtime 11.3 LTS e superior dá suporte ao modo de substituição de partição dinâmico para tabelas particionadas. Para tabelas com várias partições, o Databricks Runtime 11.3 LTS e versões anteriores dão suporte apenas a substituição dinâmica de partições se todas as colunas de partição forem do mesmo tipo de dados.

Quando no modo de substituição de partição dinâmica, as operações substituem todos os dados existentes em cada partição lógica para a qual a gravação confirma novos dados. Todas as partições lógicas existentes para as quais a gravação não contém dados permanecem inalteradas. Esse modo só é aplicável quando os dados estão sendo gravados no modo de substituição: INSERT OVERWRITE no SQL ou em uma gravaçã de DataFrame com df.write.mode("overwrite").

Configure o modo de substituição de partição dinâmica definindo a configuração de sessão do Spark spark.sql.sources.partitionOverwriteMode como dynamic. Você também pode habilitar isso definindo a opção partitionOverwriteMode de DataFrameWriter como dynamic. Se presente, a opção específica da consulta substitui o modo definido na configuração da sessão. O padrão para partitionOverwriteMode é static.

Importante

Valide se os dados gravados com a substituição de partição dinâmica chegam apenas nas partições esperadas. Uma linha na partição incorreta pode levar à substituição acidental de uma partição inteira.

O exemplo a seguir demonstra o uso de substituições de partição dinâmica:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Observação

  • A substituição de partição dinâmica entra em conflito com a opção replaceWhere para tabelas particionadas.
    • Se a substituição de partição dinâmica for habilitada na configuração de sessão do Spark e replaceWhere for fornecido como uma opção de DataFrameWriter, o Delta Lake substituirá os dados de acordo com a expressão replaceWhere (opções específicas de consulta substituem configurações de sessão).
    • Você receberá um erro se as opções DataFrameWriter tiverem a substituição de partição dinâmica e replaceWhere habilitados.
  • Você não pode especificar overwriteSchema como true ao usar a substituição de partição dinâmica.