Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Cada operação que modifica uma tabela cria uma nova versão da tabela. Use informação de histórico para auditar operações, reverter uma tabela ou consultar uma tabela num ponto específico do tempo usando a viagem no tempo.
Nota
O Databricks não recomenda usar o histórico de tabelas como solução de backup a longo prazo para arquivamento de dados. Use apenas os últimos 7 dias para operações de viagem no tempo, a menos que tenha definido tanto as configurações de dados como de retenção de registos para um valor maior.
Recuperar histórico da tabela
Recupere informações, incluindo as operações, o utilizador e o carimbo temporal de cada escrita numa tabela executando o comando history. As operações são retornadas em ordem cronológica inversa.
A retenção do histórico da tabela é determinada pela configuração da tabela logRetentionDuration, que é de 30 dias por padrão.
Nota
A viagem no tempo e o histórico da tabela são controlados por diferentes limites de retenção. Vê O que é a viagem no tempo?
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Para obter detalhes da sintaxe do Spark SQL, consulte DESCRIBE HISTORY.
Consulte a documentação da API Delta Lake para obter detalhes da sintaxe Scala/Java/Python.
O Explorador de Catálogos fornece uma vista visual desta informação detalhada e história da tabela. Além do esquema da tabela e dos dados de exemplo, pode clicar no separador Histórico para ver o histórico da tabela exibido com DESCRIBE HISTORY.
Esquema do histórico
A saída da operação history tem as seguintes colunas.
| Coluna | Tipo | Descrição |
|---|---|---|
| versão | longo | Versão da tabela gerada pela operação. |
| carimbo de data/hora | carimbo de data/hora | Quando esta versão foi confirmada. |
| ID de Utilizador | cadeia (de caracteres) | ID do usuário que executou a operação. |
| nome de utilizador | cadeia (de caracteres) | Nome do usuário que executou a operação. |
| operação | cadeia (de caracteres) | Nome da operação. |
| parâmetros de operação | mapa | Parâmetros da operação (por exemplo, predicados.) |
| tarefa | estrutura | Detalhes do trabalho que executou a operação. |
| bloco de notas | estrutura | Detalhes do caderno a partir do qual a operação foi executada. |
| clusterId | cadeia (de caracteres) | ID do cluster no qual a operação foi executada. |
| lerVersão | longo | Versão da tabela que foi lida para executar a operação de gravação. |
| Nível de isolamento | cadeia (de caracteres) | Nível de isolamento usado para esta operação. |
| isBlindAppend | Booleano | Se esta operação anexou dados. |
| operationMetrics | mapa | Métricas da operação (por exemplo, número de linhas e arquivos modificados.) |
| MetadadosDoUsuário | cadeia (de caracteres) | Metadados de confirmação definidos pelo usuário, se especificados |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Nota
- Algumas das outras colunas não estão disponíveis se escrever numa tabela usando os seguintes métodos:
- As colunas adicionadas no futuro serão sempre adicionadas após a última coluna.
Compreensão partitionBy dos parâmetros de operação
O partitionBy campo só é significativo para operações CREATE e OVERWRITE que definem ou alteram o esquema de partição de uma tabela.
Para operações de acréscimo a tabelas existentes (APPEND, INSERT, UPDATE, DELETE, MERGE), este campo pode mostrar uma matriz [] vazia ou colunas de partição, dependendo do método de gravação usado (.save() vs .saveAsTable()). Essa inconsistência é um comportamento esperado e não deve ser usada para validar gravações.
Importante
Não confie partitionBy no histórico para validar operações de acréscimo. O valor varia com base nos detalhes da implementação, mas não afeta como os dados são gravados nas partições.
Example
Considere uma tabela particionada pela date coluna:
# Initial table creation - partitionBy is populated
df.write.format("delta") \
.partitionBy("date") \
.saveAsTable("sales_data")
A operação CREATE no histórico mostra:
operationParameters: {
"mode": "ErrorIfExists",
"partitionBy": "[\"date\"]"
}
Quando você acrescenta dados a esta tabela:
# Subsequent append - partitionBy shows empty
new_df.write.format("delta") \
.mode("append") \
.saveAsTable("sales_data")
A operação APPEND mostra:
operationParameters: {
"mode": "Append",
"partitionBy": "[]"
}
O valor vazio partitionBy é esperado. Os dados ainda são gravados nas partições corretas com base no esquema de partição existente da tabela. Note que .save() para um caminho específico pode mostrar colunas de partição neste campo, mas essa diferença é um detalhe de implementação e não afeta o comportamento de gravação.
Métricas de operação
A operação history retorna uma coleção de métricas de operações no mapa de colunas operationMetrics.
As tabelas a seguir listam as definições de chave do mapa por operação.
| Operação | Nome da métrica | Descrição |
|---|---|---|
| ESCREVA, CREATE TABLE COMO SELECT, SUBSTITUA TABLE COMO SELECT, COPY INTO | ||
| número de ficheiros | Número de arquivos gravados. | |
| numOutputBytes | Tamanho em bytes do conteúdo escrito. | |
| numOutputRows | Número de linhas escritas. | |
| Transmissão UPDATE | ||
| númeroDeFicheirosAdicionados | Número de ficheiros adicionados. | |
| numFicheirosRemovidos | Número de ficheiros removidos. | |
| numOutputRows | Número de linhas escritas. | |
| numOutputBytes | Tamanho da escrita em bytes. | |
| SUPRIMIR | ||
| númeroDeFicheirosAdicionados | Número de ficheiros adicionados. Não fornecido quando as partições da tabela são excluídas. | |
| numFicheirosRemovidos | Número de ficheiros removidos. | |
| Número de Linhas Eliminadas | Número de linhas removidas. Não fornecido quando as partições da tabela são excluídas. | |
| numCopiedRows | Número de linhas copiadas no processo de exclusão de arquivos. | |
| tempoDeExecucaoMs | Tempo necessário para executar toda a operação. | |
| scanTimeMs | Tempo necessário para verificar os arquivos em busca de correspondências. | |
| rewriteTimeMs | Tempo necessário para reescrever os arquivos correspondentes. | |
| Truncar | ||
| numFicheirosRemovidos | Número de ficheiros removidos. | |
| tempoDeExecucaoMs | Tempo necessário para executar toda a operação. | |
| MESCLAR | ||
| numSourceRows | Número de linhas no DataFrame de origem. | |
| numTargetRowsInserted | Número de linhas inseridas na tabela de destino. | |
| numTargetRowsUpdated | Número de linhas atualizadas na tabela de destino. | |
| numTargetRowsDeleted | Número de linhas excluídas na tabela de destino. | |
| numTargetRowsCopiado | Número de linhas de destino copiadas. | |
| numOutputRows | Número total de linhas escritas. | |
| NúmeroDeFicheirosAlvoAdicionados | Número de arquivos adicionados ao coletor (destino). | |
| numTargetFilesRemovido | Número de arquivos removidos do coletor (destino). | |
| tempoDeExecucaoMs | Tempo necessário para executar toda a operação. | |
| scanTimeMs | Tempo necessário para verificar os arquivos em busca de correspondências. | |
| rewriteTimeMs | Tempo necessário para reescrever os arquivos correspondentes. | |
| UPDATE | ||
| númeroDeFicheirosAdicionados | Número de ficheiros adicionados. | |
| numFicheirosRemovidos | Número de ficheiros removidos. | |
| numUpdatedRows | Número de linhas atualizadas. | |
| numCopiedRows | Número de linhas apenas copiadas no processo de atualização de arquivos. | |
| tempoDeExecucaoMs | Tempo necessário para executar toda a operação. | |
| scanTimeMs | Tempo necessário para verificar os arquivos em busca de correspondências. | |
| rewriteTimeMs | Tempo necessário para reescrever os arquivos correspondentes. | |
| FSCK | numFicheirosRemovidos | Número de ficheiros removidos. |
| CONVERTER | Número de Ficheiros Convertidos | Número de arquivos Parquet que foram convertidos. |
| OPTIMIZE | ||
| númeroDeFicheirosAdicionados | Número de ficheiros adicionados. | |
| numFicheirosRemovidos | Número de arquivos otimizados. | |
| numAddedBytes | Número de bytes adicionados depois que a tabela foi otimizada. | |
| numRemovedBytes | Número de bytes removidos. | |
| tamanhoMinimoDeFicheiro | Tamanho do menor arquivo depois que a tabela foi otimizada. | |
| p25FileSize | Tamanho do arquivo de percentil 25 depois que a tabela foi otimizada. | |
| p50FileSize | Tamanho médio do arquivo após a otimização da tabela. | |
| p75FileSize | Tamanho do ficheiro do percentil 75 após a tabela ter sido otimizada. | |
| tamanhoMáximoDoFicheiro | Tamanho do arquivo maior depois que a tabela foi otimizada. | |
| clone | ||
| tamanhoDaTabelaFonte | Tamanho em bytes da tabela de origem na versão clonada. | |
| sourceNumOfFiles | Número de arquivos na tabela de origem na versão clonada. | |
| numFicheirosRemovidos | Número de ficheiros removidos da tabela de destino se uma tabela anterior fosse substituída. | |
| removedFilesSize | Tamanho total em bytes dos ficheiros removidos da tabela de destino caso uma tabela anterior tenha sido substituída. | |
| numFicheirosCopiados | Número de arquivos que foram copiados para o novo local. 0 para clones superficiais. | |
| tamanhoDeFicheirosCopiados | Tamanho total em bytes dos arquivos que foram copiados para o novo local. 0 para clones superficiais. | |
| RESTORE | ||
| tamanhoDaTabelaApósRestauração | Tamanho da tabela em bytes após a restauração. | |
| numDeFicheirosApósRestauro | Número de arquivos na tabela após a restauração. | |
| numFicheirosRemovidos | Número de ficheiros removidos pela operação de restauro. | |
| númeroDeFicheirosRestaurados | Número de arquivos que foram adicionados como resultado da restauração. | |
| removedFilesSize | Tamanho em bytes dos arquivos removidos pela restauração. | |
| tamanhoDosFicheirosRestaurados | Tamanho em bytes dos arquivos adicionados pela restauração. | |
| VACUUM | ||
| númeroFicheirosEliminados | Número de ficheiros eliminados. | |
| numVacuumedDirectories | Número de diretórios aspirados. | |
| númeroFicheirosParaEliminar | Número de ficheiros a eliminar. |
O que é a viagem no tempo?
A viagem no tempo suporta consultar versões anteriores das tabelas, registadas no registo de transações, com base no timestamp ou na versão da tabela. Você pode usar a viagem no tempo para aplicativos como os seguintes:
- Recriar análises, relatórios ou saídas (por exemplo, a saída de um modelo de aprendizado de máquina). Isso pode ser útil para depuração ou auditoria, especialmente em setores regulamentados.
- Escrever consultas temporais complexas.
- Corrigir erros nos seus dados.
- Proporcionar isolamento de instantâneo para um conjunto de consultas em tabelas que mudam rapidamente.
Importante
No Databricks Runtime 18.0 e superiores, as consultas de versão temporal são bloqueadas se solicitarem uma versão anterior à propriedade da tabela deletedFileRetentionDuration (padrão 7 dias). Para tabelas geridas pelo Unity Catalog, isto aplica-se ao Databricks Runtime 12.2 e superiores.
Sintaxe da viagem no tempo
Consulta uma tabela com viagem no tempo adicionando uma cláusula após a especificação do nome da tabela.
-
timestamp_expressionpode ser qualquer um:-
'2018-10-18T22:15:12.013Z', ou seja, uma cadeia de caracteres que pode ser convertida em um carimbo de data/hora cast('2018-10-18 13:36:32 CEST' as timestamp)-
'2018-10-18', ou seja, uma cadeia de caracteres de data current_timestamp() - interval 12 hoursdate_sub(current_date(), 1)- Qualquer outra expressão que seja ou possa ser convertida em um carimbo de data/hora
-
-
versioné um valor longo que pode ser obtido a partir da saída deDESCRIBE HISTORY table_spec.
Nem timestamp_expressionversion podem ser subconsultas.
Somente cadeias de caracteres de carimbo de data ou hora são aceitas. Por exemplo, "2019-01-01" e "2019-01-01T00:00:00.000Z". Consulte o seguinte código para exemplo de sintaxe:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
Você também pode usar a sintaxe @ para especificar o carimbo de data/hora ou a versão como parte do nome da tabela. O carimbo de data/hora deve estar no yyyyMMddHHmmssSSS formato. Você pode especificar uma versão depois @ antecipando a v para a versão. Consulte o seguinte código para exemplo de sintaxe:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
O que são pontos de verificação do log de transações?
As versões das tabelas são registadas como ficheiros JSON dentro do diretório de registo de transações, que é armazenado juntamente com os dados das tabelas. Para otimizar a consulta por checkpoints, as versões das tabelas são agregadas em ficheiros de checkpoint Parquet, evitando a necessidade de ler todas as versões JSON do histórico das tabelas. O Azure Databricks otimiza a frequência de pontos de verificação para o tamanho dos dados e a carga de trabalho. Os usuários não devem precisar interagir diretamente com os pontos de verificação. A frequência do ponto de verificação está sujeita a alterações sem aviso prévio.
Configurar a retenção de dados para consultas de viagem no tempo
Para consultar uma versão anterior da tabela, deve-se manter e os arquivos de log e de dados dessa versão.
Os arquivos de dados são eliminados quando VACUUM é executado numa tabela. A remoção de ficheiros de registo é gerida automaticamente após as versões das tabelas de checkpointing.
Como a maioria das tabelas é VACUUM executada contra elas regularmente, as consultas pontuais devem respeitar o limiar de retenção para VACUUM, que é 7 dias por padrão.
Para aumentar o limiar de retenção de dados para tabelas, deve configurar as seguintes propriedades de tabela:
-
delta.logRetentionDuration = "interval <interval>": controla por quanto tempo o histórico de uma tabela é mantido. A predefinição éinterval 30 days. -
delta.deletedFileRetentionDuration = "interval <interval>": determina o limiteVACUUMusa para remover arquivos de dados que não são mais referenciados na versão atual da tabela. A predefinição éinterval 7 days.
Pode especificar propriedades de tabela durante a criação da tabela ou defini-las com uma ALTER TABLE instrução. Consulte Referência de Propriedades da Tabela.
Nota
Em Databricks Runtime 18.0 e superiores, logRetentionDuration deve ser maior ou igual a deletedFileRetentionDuration. Para tabelas geridas pelo Unity Catalog, isto aplica-se ao Databricks Runtime 12.2 e superiores.
Para aceder a 30 dias de dados históricos, defina delta.deletedFileRetentionDuration = "interval 30 days" (que corresponde à definição padrão para delta.logRetentionDuration).
Aumentar o limite de retenção de dados pode fazer com que os custos de armazenamento aumentem, à medida que mais arquivos de dados são mantidos.
Restaurar uma tabela a um estado anterior
Podes restaurar uma tabela ao seu estado anterior usando o RESTORE comando. As tabelas mantêm internamente versões históricas que permitem restaurá-las a um estado anterior.
Uma versão correspondente ao estado anterior ou a um carimbo de data/hora de quando o estado anterior foi criado é suportada como opções pelo comando RESTORE.
Importante
- Você pode restaurar uma tabela já restaurada.
- Você pode restaurar uma tabela clonada .
- Você deve ter
MODIFYpermissão na tabela que está sendo restaurada. - Não é possível restaurar uma tabela para uma versão mais antiga onde os arquivos de dados foram excluídos manualmente ou por
vacuum. A restauração parcial para esta versão ainda é possível sespark.sql.files.ignoreMissingFilesestiver definida comotrue. - O formato de carimbo de data/hora para restaurar para um estado anterior é
yyyy-MM-dd HH:mm:ss. O fornecimento apenas de uma cadeia de caracteres date(yyyy-MM-dd) também é suportado.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Para obter detalhes de sintaxe, consulte RESTORE.
Importante
A restauração é considerada uma operação de alteração de dados. As entradas de registo adicionadas pelo RESTORE comando contêm dataChange definido como true. Se existir uma aplicação a jusante, como um trabalho de streaming estruturado que processa as atualizações de uma tabela, as entradas do registo de alterações de dados adicionadas pela operação de restauro são consideradas novas atualizações de dados, e o seu processamento pode resultar em dados duplicados.
Por exemplo:
| Versão da tabela | Operação | Atualizações dos registos | Registros em atualizações de log de alteração de dados |
|---|---|---|---|
| 0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (nome = Viktor, idade = 29, (nome = George, idade = 55) |
| 1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (nome = George, idade = 39) |
| 2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (Sem registros, pois a compactação Otimize não altera os dados na tabela) |
| 3 | RESTORE(versão=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (nome = Viktor, idade = 29), (nome = George, idade = 55), (nome = George, idade = 39) |
No exemplo anterior, o RESTORE comando resulta em atualizações que já tinham sido observadas ao ler as versões 0 e 1 da tabela. Se uma consulta de streaming estava lendo esta tabela, esses arquivos serão considerados como dados recém-adicionados e serão processados novamente.
Restaurar métricas
RESTORE relata as seguintes métricas como um DataFrame de linha única quando a operação for concluída:
table_size_after_restore: O tamanho da tabela após a restauração.num_of_files_after_restore: O número de arquivos na tabela após a restauração.num_removed_files: Número de ficheiros removidos (logicamente eliminados) da tabela.num_restored_files: Número de arquivos restaurados devido à reversão.removed_files_size: Tamanho total, em bytes, dos ficheiros que são removidos da tabela.restored_files_size: Tamanho total em bytes dos arquivos que são restaurados.
Exemplos de utilização de viagens no tempo
Corrigir exclusões acidentais em uma tabela para o usuário
111:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111Corrigir atualizações incorretas acidentais em uma tabela:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *Consulte o número de novos clientes adicionados na última semana.
SELECT ( SELECT count(distinct userId) FROM my_table ) - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7) ) AS new_customers
Como posso encontrar a versão do último commit na sessão do Spark?
Para obter o número da versão do último commit feito pelo atual SparkSession em todos os threads e em todas as tabelas, execute uma consulta na configuração SQL spark.databricks.delta.lastCommitVersionInSession.
Nota
Para tabelas Apache Iceberg, use spark.databricks.iceberg.lastCommitVersionInSession em vez de spark.databricks.delta.lastCommitVersionInSession.
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
linguagem de programação Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Se nenhuma confirmação tiver sido feita pelo SparkSession, consultar a chave retornará um valor vazio.
Nota
Se partilhar o mesmo SparkSession em várias linhas de execução, é semelhante ao compartilhamento de uma variável em várias linhas de execução; pode encontrar condições de concorrência quando o valor de configuração é atualizado simultaneamente.