Trabalhar com o histórico da tabela Delta Lake
Cada operação que modifica uma tabela Delta Lake cria uma nova versão da tabela. Você pode usar as informações do histórico para auditar operações, reverter uma tabela ou consultar uma tabela em um ponto específico no tempo usando viagens no tempo.
Nota
O Databricks não recomenda a utilização do histórico de tabelas do Delta Lake como uma solução de cópia de segurança de longo prazo para o arquivo de dados. O Databricks recomenda a utilização apenas dos últimos 7 dias para operações de viagem no tempo, a menos que tenha definido as configurações de retenção de dados e registos para um valor maior.
Obter o histórico da tabela Delta
Você pode recuperar informações, incluindo as operações, o usuário e o carimbo de data/hora para cada gravação em uma tabela Delta executando o history
comando. As operações são retornadas em ordem cronológica inversa.
A retenção do histórico da tabela é determinada pela configuração delta.logRetentionDuration
da tabela, que é de 30 dias por padrão.
Nota
As viagens no tempo e o histórico de tabelas são controlados por diferentes limiares de retenção. Veja O que é a viagem no tempo do Delta Lake?
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Para obter detalhes da sintaxe do Spark SQL, consulte DESCRIBE HISTORY.
Consulte a documentação da API Delta Lake para obter detalhes da sintaxe Scala/Java/Python.
O Catalog Explorer fornece uma exibição visual dessas informações detalhadas da tabela e do histórico das tabelas Delta. Além do esquema da tabela e dos dados de exemplo, você pode clicar na guia Histórico para ver o histórico da tabela exibido com DESCRIBE HISTORY
o .
Esquema do histórico
A saída da history
operação tem as seguintes colunas.
Column | Tipo | Description |
---|---|---|
versão | long | Versão da tabela gerada pela operação. |
carimbo de data/hora | carimbo de data/hora | Quando esta versão foi confirmada. |
ID de Utilizador | string | ID do usuário que executou a operação. |
nome de utilizador | string | Nome do usuário que executou a operação. |
operation | string | Nome da operação. |
operationParameters | map | Parâmetros da operação (por exemplo, predicados.) |
tarefa | estruturar | Detalhes do trabalho que executou a operação. |
bloco de notas | estruturar | Detalhes do caderno a partir do qual a operação foi executada. |
clusterId | string | ID do cluster no qual a operação foi executada. |
readVersion | long | Versão da tabela que foi lida para executar a operação de gravação. |
Nível de isolamento | string | Nível de isolamento usado para esta operação. |
isBlindAppend | boolean | Se esta operação anexou dados. |
operationMetrics | map | Métricas da operação (por exemplo, número de linhas e arquivos modificados.) |
userMetadata | string | Metadados de confirmação definidos pelo usuário, se especificados |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Nota
- Algumas das outras colunas não estarão disponíveis se você escrever em uma tabela Delta usando os seguintes métodos:
- As colunas adicionadas no futuro serão sempre adicionadas após a última coluna.
Chaves de métricas de operação
A history
operação retorna uma coleção de métricas de operações no mapa de operationMetrics
colunas.
As tabelas a seguir listam as definições de chave do mapa por operação.
Operação | Nome da métrica | Description |
---|---|---|
ESCREVER, CRIAR TABELA COMO SELECIONAR, SUBSTITUIR TABELA COMO SELECIONAR, COPIAR PARA | ||
numFiles | Número de arquivos gravados. | |
numOutputBytes | Tamanho em bytes do conteúdo escrito. | |
numOutputRows | Número de linhas escritas. | |
ATUALIZAÇÃO DE STREAMING | ||
numAddedFiles | Número de ficheiros adicionados. | |
numRemovedFiles | Número de ficheiros removidos. | |
numOutputRows | Número de linhas escritas. | |
numOutputBytes | Tamanho da escrita em bytes. | |
DELETE | ||
numAddedFiles | Número de ficheiros adicionados. Não fornecido quando as partições da tabela são excluídas. | |
numRemovedFiles | Número de ficheiros removidos. | |
numDeletedRows | Número de linhas removidas. Não fornecido quando as partições da tabela são excluídas. | |
numCopiedRows | Número de linhas copiadas no processo de exclusão de arquivos. | |
executionTimeMs | Tempo necessário para executar toda a operação. | |
scanTimeMs | Tempo necessário para verificar os arquivos em busca de correspondências. | |
rewriteTimeMs | Tempo necessário para reescrever os arquivos correspondentes. | |
TRUNCATE | ||
numRemovedFiles | Número de ficheiros removidos. | |
executionTimeMs | Tempo necessário para executar toda a operação. | |
MESCLAR | ||
numSourceRows | Número de linhas no DataFrame de origem. | |
numTargetRowsInserted | Número de linhas inseridas na tabela de destino. | |
numTargetRowsUpdated | Número de linhas atualizadas na tabela de destino. | |
numTargetRowsDeleted | Número de linhas excluídas na tabela de destino. | |
numTargetRowsCopiado | Número de linhas de destino copiadas. | |
numOutputRows | Número total de linhas escritas. | |
numTargetFilesAdded | Número de arquivos adicionados ao coletor (destino). | |
numTargetFilesRemovido | Número de arquivos removidos do coletor (destino). | |
executionTimeMs | Tempo necessário para executar toda a operação. | |
scanTimeMs | Tempo necessário para verificar os arquivos em busca de correspondências. | |
rewriteTimeMs | Tempo necessário para reescrever os arquivos correspondentes. | |
ATUALIZAR | ||
numAddedFiles | Número de ficheiros adicionados. | |
numRemovedFiles | Número de ficheiros removidos. | |
numUpdatedRows | Número de linhas atualizadas. | |
numCopiedRows | Número de linhas apenas copiadas no processo de atualização de arquivos. | |
executionTimeMs | Tempo necessário para executar toda a operação. | |
scanTimeMs | Tempo necessário para verificar os arquivos em busca de correspondências. | |
rewriteTimeMs | Tempo necessário para reescrever os arquivos correspondentes. | |
FSCK | numRemovedFiles | Número de ficheiros removidos. |
CONVERTER | numConvertedFiles | Número de arquivos Parquet que foram convertidos. |
OPTIMIZE | ||
numAddedFiles | Número de ficheiros adicionados. | |
numRemovedFiles | Número de arquivos otimizados. | |
numAddedBytes | Número de bytes adicionados depois que a tabela foi otimizada. | |
numRemovedBytes | Número de bytes removidos. | |
minFileSize | Tamanho do menor arquivo depois que a tabela foi otimizada. | |
p25FileSize | Tamanho do arquivo de percentil 25 depois que a tabela foi otimizada. | |
p50FileSize | Tamanho médio do arquivo após a otimização da tabela. | |
p75FileSize | Tamanho do arquivo de percentil 75 depois que a tabela foi otimizada. | |
maxFileSize | Tamanho do arquivo maior depois que a tabela foi otimizada. | |
CLONE | ||
sourceTableSize | Tamanho em bytes da tabela de origem na versão clonada. | |
sourceNumOfFiles | Número de arquivos na tabela de origem na versão clonada. | |
numRemovedFiles | Número de arquivos removidos da tabela de destino se uma tabela Delta anterior foi substituída. | |
removedFilesSize | Tamanho total, em bytes, dos arquivos removidos da tabela de destino se uma tabela Delta anterior tiver sido substituída. | |
numCopiedFiles | Número de arquivos que foram copiados para o novo local. 0 para clones superficiais. | |
copiedFilesSize | Tamanho total em bytes dos arquivos que foram copiados para o novo local. 0 para clones superficiais. | |
RESTAURAR | ||
tableSizeAfterRestore | Tamanho da tabela em bytes após a restauração. | |
numOfFilesAfterRestore | Número de arquivos na tabela após a restauração. | |
numRemovedFiles | Número de ficheiros removidos pela operação de restauro. | |
numRestoredFiles | Número de arquivos que foram adicionados como resultado da restauração. | |
removedFilesSize | Tamanho em bytes dos arquivos removidos pela restauração. | |
restoredFilesSize | Tamanho em bytes dos arquivos adicionados pela restauração. | |
VACUUM | ||
numDeletedFiles | Número de ficheiros eliminados. | |
numVacuumedDirectories | Número de diretórios aspirados. | |
numFilesToDelete | Número de ficheiros a eliminar. |
O que é Delta Lake viagem no tempo?
A viagem no tempo do Delta Lake suporta a consulta de versões anteriores da tabela com base no carimbo de data/hora ou na versão da tabela (conforme registrado no log de transações). Você pode usar a viagem no tempo para aplicativos como os seguintes:
- Recriar análises, relatórios ou saídas (por exemplo, a saída de um modelo de aprendizado de máquina). Isso pode ser útil para depuração ou auditoria, especialmente em setores regulamentados.
- Escrever consultas temporais complexas.
- Corrigir erros nos seus dados.
- Fornecendo isolamento de instantâneo para um conjunto de consultas para tabelas de mudança rápida.
Importante
As versões de tabela acessíveis com viagens no tempo são determinadas por uma combinação do limite de retenção para arquivos de log de transações e a frequência e retenção especificada para VACUUM
operações. Se você executar VACUUM
diariamente com os valores padrão, 7 dias de dados estarão disponíveis para viagem no tempo.
Sintaxe de viagem no tempo delta
Você consulta uma tabela Delta com viagem no tempo adicionando uma cláusula após a especificação do nome da tabela.
timestamp_expression
pode ser qualquer um:'2018-10-18T22:15:12.013Z'
, ou seja, uma cadeia de caracteres que pode ser convertida em um carimbo de data/horacast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
, ou seja, uma cadeia de caracteres de datacurrent_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Qualquer outra expressão que seja ou possa ser convertida em um carimbo de data/hora
version
é um valor longo que pode ser obtido a partir da saída deDESCRIBE HISTORY table_spec
.
Nem timestamp_expression
version
podem ser subconsultas.
Somente cadeias de caracteres de carimbo de data ou hora são aceitas. Por exemplo, "2019-01-01"
e "2019-01-01T00:00:00.000Z"
. Consulte o seguinte código para exemplo de sintaxe:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
Você também pode usar a @
sintaxe para especificar o carimbo de data/hora ou a versão como parte do nome da tabela. O carimbo de data/hora deve estar no yyyyMMddHHmmssSSS
formato. Você pode especificar uma versão depois @
antecipando a v
para a versão. Consulte o seguinte código para exemplo de sintaxe:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
O que são pontos de verificação do log de transações?
O Delta Lake registra as versões da tabela como arquivos JSON dentro do diretório, que é armazenado junto com os dados da _delta_log
tabela. Para otimizar a consulta de pontos de verificação, o Delta Lake agrega versões de tabela a arquivos de ponto de verificação Parquet, evitando a necessidade de ler todas as versões JSON do histórico de tabelas. O Azure Databricks otimiza a frequência de pontos de verificação para o tamanho dos dados e a carga de trabalho. Os usuários não devem precisar interagir diretamente com os pontos de verificação. A frequência do ponto de verificação está sujeita a alterações sem aviso prévio.
Configurar a retenção de dados para consultas de viagem no tempo
Para consultar uma versão anterior da tabela, você deve manter o log e os arquivos de dados dessa versão.
Os arquivos de dados são excluídos quando VACUUM
executados em uma tabela. O Delta Lake gerencia a remoção do arquivo de log automaticamente após as versões da tabela de pontos de verificação.
Como a maioria das tabelas Delta tem VACUUM
sido executada regularmente, as consultas point-in-time devem respeitar o limite de retenção do VACUUM
, que é de 7 dias por padrão.
Para aumentar o limite de retenção de dados para tabelas Delta, você deve configurar as seguintes propriedades da tabela:
delta.logRetentionDuration = "interval <interval>"
: controla por quanto tempo o histórico de uma tabela é mantido. A predefinição éinterval 30 days
.delta.deletedFileRetentionDuration = "interval <interval>"
: Determina o limiteVACUUM
usado para remover arquivos de dados que não são mais referenciados na versão atual da tabela. A predefinição éinterval 7 days
.
Você pode especificar propriedades Delta durante a criação da tabela ou defini-las com uma ALTER TABLE
instrução. Consulte Referência de propriedades da tabela delta.
Nota
Você deve definir ambas as propriedades para garantir que o histórico de tabelas seja mantido por mais tempo para tabelas com operações frequentes VACUUM
. Por exemplo, para acessar 30 dias de dados históricos, defina delta.deletedFileRetentionDuration = "interval 30 days"
(que corresponde à configuração padrão para delta.logRetentionDuration
).
Aumentar o limite de retenção de dados pode fazer com que os custos de armazenamento aumentem, à medida que mais arquivos de dados são mantidos.
Restaurar uma tabela Delta para um estado anterior
Você pode restaurar uma tabela Delta para seu estado anterior usando o RESTORE
comando. Uma tabela Delta mantém internamente versões históricas da tabela que lhe permitem ser restauradas para um estado anterior.
Uma versão correspondente ao estado anterior ou a um carimbo de data/hora de quando o estado anterior foi criado é suportada como opções pelo comando RESTORE
.
Importante
- Você pode restaurar uma tabela já restaurada.
- Você pode restaurar uma tabela clonada .
- Você deve ter
MODIFY
permissão na tabela que está sendo restaurada. - Não é possível restaurar uma tabela para uma versão mais antiga em que os arquivos de dados foram excluídos manualmente ou pelo
vacuum
. A restauração parcial para esta versão ainda é possível sespark.sql.files.ignoreMissingFiles
estiver definida comotrue
. - O formato de carimbo de data/hora para restaurar para um estado anterior é
yyyy-MM-dd HH:mm:ss
. O fornecimento apenas de uma cadeia de caracteres date(yyyy-MM-dd
) também é suportado.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Para obter detalhes de sintaxe, consulte RESTAURAR.
Importante
A restauração é considerada uma operação de alteração de dados. As entradas de log do Delta Lake adicionadas RESTORE
pelo comando contêm dataChange definido como true. Se houver um aplicativo downstream, como um trabalho de streaming estruturado que processa as atualizações para uma tabela Delta Lake, as entradas do log de alteração de dados adicionadas pela operação de restauração serão consideradas como novas atualizações de dados e processá-las poderá resultar em dados duplicados.
Por exemplo:
Versão da tabela | Operação | Atualizações de log delta | Registros em atualizações de log de alteração de dados |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (nome = Viktor, idade = 29, (nome = George, idade = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (nome = George, idade = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (Sem registros, pois a compactação Otimize não altera os dados na tabela) |
3 | RESTAURAR(versão=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (nome = Viktor, idade = 29), (nome = George, idade = 55), (nome = George, idade = 39) |
No exemplo anterior, o RESTORE
comando resulta em atualizações que já foram vistas ao ler a tabela Delta versão 0 e 1. Se uma consulta de streaming estava lendo esta tabela, esses arquivos serão considerados como dados recém-adicionados e serão processados novamente.
Restaurar métricas
RESTORE
relata as seguintes métricas como um DataFrame de linha única quando a operação for concluída:
table_size_after_restore
: O tamanho da tabela após a restauração.num_of_files_after_restore
: O número de arquivos na tabela após a restauração.num_removed_files
: Número de ficheiros removidos (logicamente eliminados) da tabela.num_restored_files
: Número de arquivos restaurados devido à reversão.removed_files_size
: Tamanho total em bytes dos arquivos que são removidos da tabela.restored_files_size
: Tamanho total em bytes dos arquivos que são restaurados.
Exemplos de utilização de viagens no tempo do Delta Lake
Corrigir exclusões acidentais em uma tabela para o usuário
111
:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Corrigir atualizações incorretas acidentais em uma tabela:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Consulte o número de novos clientes adicionados na última semana.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Como posso encontrar a última versão de confirmação na sessão do Spark?
Para obter o número da versão da última confirmação escrita pela corrente SparkSession
em todos os threads e todas as tabelas, consulte a configuração spark.databricks.delta.lastCommitVersionInSession
SQL .
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Se nenhuma confirmação tiver sido feita pelo SparkSession
, consultar a chave retornará um valor vazio.
Nota
Se você compartilhar o mesmo SparkSession
em vários threads, é semelhante ao compartilhamento de uma variável em vários threads: você pode atingir as condições de corrida à medida que o valor de configuração é atualizado simultaneamente.