Partilhar via


Melhores práticas: Delta Lake

Este artigo descreve as práticas recomendadas ao usar o Delta Lake.

A Databricks recomenda o uso da otimização preditiva. Consulte Otimização preditiva para Delta Lake.

Ao excluir e recriar uma tabela no mesmo local, você sempre deve usar uma CREATE OR REPLACE TABLE instrução. Consulte Soltar ou substituir uma tabela Delta.

Use clustering líquido para otimizar o salto de dados

O Databricks recomenda o uso de clustering líquido em vez de particionamento, ordem Z ou outras estratégias de organização de dados para otimizar o layout de dados para pular dados. Veja Utilizar clustering líquido para tabelas Delta.

Arquivos compactos

A otimização preditiva é executada OPTIMIZE automaticamente e VACUUM comanda as tabelas gerenciadas do Unity Catalog. Consulte Otimização preditiva para Delta Lake.

O Databricks recomenda executar frequentemente o comando OTIMIZE para compactar arquivos pequenos.

Nota

Esta operação não remove os ficheiros antigos. Para removê-los, execute o comando VACUUM .

Substituir o conteúdo ou esquema de uma tabela

Às vezes, você pode querer substituir uma tabela Delta. Por exemplo:

  • Você descobre que os dados na tabela estão incorretos e deseja substituir o conteúdo.
  • Você deseja reescrever toda a tabela para fazer alterações de esquema incompatíveis (como alterar tipos de coluna).

Embora você possa excluir todo o diretório de uma tabela Delta e criar uma nova tabela no mesmo caminho, isso não é recomendado porque:

  • Excluir um diretório não é eficiente. Um diretório contendo arquivos muito grandes pode levar horas ou até dias para ser excluído.
  • Você perde todo o conteúdo dos arquivos excluídos; é difícil de recuperar se você excluir a tabela errada.
  • A exclusão do diretório não é atômica. Enquanto você estiver excluindo a tabela, uma consulta simultânea lendo a tabela pode falhar ou ver uma tabela parcial.

Se não for necessário alterar o esquema da tabela, você poderá excluir dados de uma tabela Delta e inserir novos dados ou atualizar a tabela para corrigir os valores incorretos.

Se quiser alterar o esquema da tabela, você pode substituir a tabela inteira atomicamente. Por exemplo:

Python

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("<your-table>") # Managed table

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .option("path", "<your-table-path>") \
  .saveAsTable("<your-table>") # External table

SQL

REPLACE TABLE <your-table> USING DELTA AS SELECT ... -- Managed table
REPLACE TABLE <your-table> USING DELTA LOCATION "<your-table-path>" AS SELECT ... -- External table

Scala

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable("<your-table>") // Managed table

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .option("path", "<your-table-path>")
  .saveAsTable("<your-table>") // External table

Há vários benefícios com essa abordagem:

  • Substituir uma tabela é muito mais rápido porque não precisa listar o diretório recursivamente ou excluir nenhum arquivo.
  • A versão antiga da tabela ainda existe. Se você excluir a tabela errada, poderá recuperar facilmente os dados antigos usando viagens no tempo. Veja Trabalhar com o histórico de tabelas do Delta Lake.
  • É uma operação atómica. As consultas simultâneas ainda podem ler a tabela enquanto você a exclui.
  • Devido às garantias de transação Delta Lake ACID, se a substituição da tabela falhar, a tabela estará em seu estado anterior.

Além disso, se você quiser excluir arquivos antigos para economizar custos de armazenamento depois de substituir a tabela, você pode usar o VACUUM para excluí-los. Ele é otimizado para exclusão de arquivos e geralmente é mais rápido do que excluir o diretório inteiro.

Cache de faísca

O Databricks não recomenda que você use o cache do Spark pelos seguintes motivos:

  • Você perde qualquer salto de dados que possa vir de filtros adicionais adicionados sobre o cache DataFrame.
  • Os dados armazenados em cache podem não ser atualizados se a tabela for acessada usando um identificador diferente.

Diferenças entre Delta Lake e Parquet no Apache Spark

O Delta Lake lida com as seguintes operações automaticamente. Nunca deve executar estas operações manualmente:

  • REFRESH TABLE: As tabelas delta sempre retornam as informações mais atualizadas, portanto, não há necessidade de ligar REFRESH TABLE manualmente após as alterações.
  • Adicionar e remover partições: o Delta Lake rastreia automaticamente o conjunto de partições presentes em uma tabela e atualiza a lista à medida que os dados são adicionados ou removidos. Como resultado, não há necessidade de executar ALTER TABLE [ADD|DROP] PARTITION ou MSCK.
  • Carregar uma única partição: Não é necessário ler partições diretamente. Por exemplo, você não precisa executar spark.read.format("parquet").load("/data/date=2017-01-01")o . Em vez disso, use uma WHERE cláusula para pular dados, como spark.read.table("<table-name>").where("date = '2017-01-01'").
  • Não modifique manualmente os arquivos de dados: o Delta Lake usa o log de transações para confirmar alterações na tabela atomicamente. Não modifique, adicione ou exclua diretamente arquivos de dados do Parquet em uma tabela Delta, pois isso pode levar à perda de dados ou à corrupção da tabela.

Melhorar o desempenho da fusão Delta Lake

Você pode reduzir o tempo necessário para mesclar usando as seguintes abordagens:

  • Reduzir o espaço de pesquisa para correspondências: Por padrão, a operação pesquisa toda a merge tabela Delta para encontrar correspondências na tabela de origem. Uma maneira de acelerar merge é reduzir o espaço de pesquisa adicionando restrições conhecidas na condição de correspondência. Por exemplo, suponha que você tenha uma tabela particionada por country e date que deseja usar merge para atualizar as informações do último dia e de um país específico. Adicionar a seguinte condição torna a consulta mais rápida, pois procura correspondências apenas nas partições relevantes:

    events.date = current_date() AND events.country = 'USA'
    

    Além disso, essa consulta também reduz as chances de conflitos com outras operações simultâneas. Consulte Níveis de isolamento e conflitos de gravação no Azure Databricks para obter mais detalhes.

  • Arquivos compactos: Se os dados forem armazenados em muitos arquivos pequenos, a leitura dos dados para procurar correspondências pode ficar lenta. Você pode compactar arquivos pequenos em arquivos maiores para melhorar a taxa de transferência de leitura. Consulte Arquivos de dados compactos com otimização no Delta Lake para obter detalhes.

  • Controle as partições aleatórias para gravações: A merge operação embaralha dados várias vezes para calcular e gravar os dados atualizados. O número de tarefas usadas para embaralhar é controlado pela configuração spark.sql.shuffle.partitionsda sessão do Spark. A definição desse parâmetro não só controla o paralelismo, mas também determina o número de arquivos de saída. Aumentar o valor aumenta o paralelismo, mas também gera um número maior de arquivos de dados menores.

  • Ativar gravações otimizadas: Para tabelas particionadas, merge pode produzir um número muito maior de arquivos pequenos do que o número de partições aleatórias. Isso ocorre porque cada tarefa aleatória pode gravar vários arquivos em várias partições e pode se tornar um gargalo de desempenho. Você pode reduzir o número de arquivos habilitando gravações otimizadas. Consulte Gravações otimizadas para Delta Lake no Azure Databricks.

  • Ajustar tamanhos de arquivo na tabela: o Azure Databricks pode detetar automaticamente se uma tabela Delta tem operações frequentes merge que reescrevem arquivos e pode optar por reduzir o tamanho de arquivos regravados em antecipação a novas regravações de arquivos no futuro. Consulte a seção sobre ajuste de tamanhos de arquivo para obter detalhes.

  • Low Shuffle Merge: Low Shuffle Merge fornece uma implementação otimizada que fornece melhor desempenho para as cargas de MERGE trabalho mais comuns. Além disso, preserva otimizações de layout de dados existentes, como ordenação Z em dados não modificados.

Gerenciar a recenticidade dos dados

No início de cada consulta, as tabelas Delta são atualizadas automaticamente para a versão mais recente da tabela. Este processo pode ser observado em blocos de anotações quando o status do comando relata: Updating the Delta table's state. No entanto, ao executar a análise histórica em uma tabela, você pode não precisar necessariamente de dados atualizados de última hora, especialmente para tabelas em que os dados de streaming estão sendo ingeridos com frequência. Nesses casos, as consultas podem ser executadas em instantâneos obsoletos da sua tabela Delta. Essa abordagem pode reduzir a latência na obtenção de resultados de consultas.

Você pode configurar a tolerância para dados obsoletos definindo a configuração spark.databricks.delta.stalenessLimit da sessão do Spark com um valor de cadeia de caracteres de tempo como 1h ou 15m (por 1 hora ou 15 minutos, respectivamente). Essa configuração é específica da sessão e não afeta outros clientes que acessam a tabela. Se o estado da tabela tiver sido atualizado dentro do limite de obsolescência, uma consulta na tabela retornará resultados sem esperar pela atualização mais recente da tabela. Essa configuração nunca impede que sua tabela seja atualizada e, quando dados obsoletos são retornados, a atualização é processada em segundo plano. Se a última atualização da tabela for mais antiga do que o limite de obsoleção, a consulta não retornará resultados até que a atualização do estado da tabela seja concluída.

Pontos de verificação aprimorados para consultas de baixa latência

Delta Lake grava pontos de verificação como um estado agregado de uma tabela Delta em uma frequência otimizada. Esses pontos de verificação servem como ponto de partida para calcular o estado mais recente da tabela. Sem pontos de verificação, o Delta Lake teria que ler uma grande coleção de arquivos JSON ("arquivos delta") representando confirmações no log de transações para calcular o estado de uma tabela. Além disso, as estatísticas de nível de coluna que o Delta Lake usa para executar o salto de dados são armazenadas no ponto de verificação.

Importante

Os pontos de verificação Delta Lake são diferentes dos pontos de verificação de Streaming Estruturado.

As estatísticas em nível de coluna são armazenadas como um struct e um JSON (para compatibilidade com versões anteriores). O formato struct torna a leitura do Delta Lake muito mais rápida, porque:

  • O Delta Lake não executa análises JSON caras para obter estatísticas no nível da coluna.
  • Os recursos de poda de coluna de parquet reduzem significativamente a E/S necessária para ler as estatísticas de uma coluna.

O formato struct permite uma coleção de otimizações que reduzem a sobrecarga das operações de leitura do Delta Lake de segundos para dezenas de milissegundos, o que reduz significativamente a latência para consultas curtas.

Gerenciar estatísticas em nível de coluna em pontos de verificação

Você gerencia como as estatísticas são escritas em pontos de verificação usando as propriedades delta.checkpoint.writeStatsAsJson da tabela e delta.checkpoint.writeStatsAsStruct. Se ambas as propriedades da tabela forem false, o Delta Lake não poderá executar o salto de dados.

  • O Batch grava estatísticas de gravação no formato JSON e struct. delta.checkpoint.writeStatsAsJson é true.
  • delta.checkpoint.writeStatsAsStruct é indefinido por padrão.
  • Os leitores usam a coluna struct quando disponível e, caso contrário, voltam a usar a coluna JSON.

Importante

Pontos de verificação aprimorados não quebram a compatibilidade com leitores Delta Lake de código aberto. No entanto, a configuração delta.checkpoint.writeStatsAsJson pode false ter implicações nos leitores proprietários do Delta Lake. Entre em contato com seus fornecedores para saber mais sobre as implicações de desempenho.

Habilitar pontos de verificação aprimorados para consultas de Streaming Estruturado

Se suas cargas de trabalho de Streaming Estruturado não tiverem requisitos de baixa latência (latências de subminuto), você poderá habilitar pontos de verificação aprimorados executando o seguinte comando SQL:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

Você também pode melhorar a latência de gravação do ponto de verificação definindo as seguintes propriedades da tabela:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

Se o salto de dados não for útil em seu aplicativo, você poderá definir ambas as propriedades como false. Em seguida, nenhuma estatística é coletada ou escrita. O Databricks não recomenda essa configuração.