Trabajar con el historial de tablas de Delta Lake

Cada operación que modifica una tabla de Delta Lake crea una nueva versión de tabla. Puede usar información de historial para auditar las operaciones, revertir una tabla o consultar una tabla en un momento dado específico mediante el viaje en el tiempo.

Nota:

Databricks no recomienda usar el historial de tablas de Delta Lake como una solución de copia de seguridad a largo plazo para el archivado de datos. Databricks recomienda usar solo los últimos 7 días para las operaciones de viaje en el tiempo, a menos que haya establecido configuraciones de retención de datos y registros en un valor mayor.

Recuperación del historial de la tabla Delta

Puede recuperar información incluidas las operaciones, el usuario, la marca de tiempo para cada escritura en una tabla Delta mediante la ejecución del comando history. Las operaciones se devuelven en orden cronológico inverso.

La retención del historial de tablas viene determinada por la configuración de tabla delta.logRetentionDuration, que es de 30 días de manera predeterminada.

Nota:

Los distintos umbrales de retención controlan el historial de tabla y el viaje en el tiempo. Consulte ¿Qué es el viaje en el tiempo de Delta Lake?

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

Para obtener información sobre la sintaxis de Spark SQL, consulte DESCRIBE HISTORY.

Para obtener información sobre la sintaxis de Scala, Java y Python, consulte la Documentación de la API de Delta Lake.

Catalog Explorer proporciona una vista de esta información detallada de las tablas y el historial de las tablas Delta. Además del esquema de la tabla y los datos de ejemplo, puede hacer clic en la pestaña History (Historial) para ver el historial de la tabla que se muestra con DESCRIBE HISTORY.

Esquema del historial

La salida de la operación history tiene las columnas siguientes.

Columna Type Descripción
version long Versión de tabla generada por la operación.
timestamp timestamp Cuándo se ha confirmado esta versión.
userId string Identificador del usuario que ejecutó la operación.
userName string Nombre del usuario que ejecutó la operación.
operation string Nombre de la operación.
operationParameters map Parámetros de la operación (por ejemplo, predicados).
trabajo struct Detalles del trabajo que ejecutó la operación.
notebook struct Detalles del cuaderno desde el que se ha ejecutado la operación.
clusterId string Identificador del clúster en el que se ejecutó la operación.
readVersion long Versión de la tabla que se leyó para realizar la operación de escritura.
isolationLevel string Nivel de aislamiento utilizado para esta operación.
isBlindAppend boolean Indica si esta operación ha anexado datos.
operationMetrics map Métricas de la operación (por ejemplo, el número de filas y archivos modificados).
userMetadata string Metadatos de confirmación definidos por el usuario, si se especificaron
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Nota:

Claves de métricas de operación

La operación history devuelve una colección de métricas de operaciones en la asignación de columnas operationMetrics.

En las tablas siguientes, se muestran las definiciones de clave de asignación por operación.

Operación Nombre de métrica Descripción
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO
numFiles Número de archivos escritos.
numOutputBytes Tamaño en bytes del contenido escrito.
numOutputRows Número de filas escritas.
STREAMING UPDATE
numAddedFiles Número de archivos agregados.
numRemovedFiles Número de archivos eliminados.
numOutputRows Número de filas escritas.
numOutputBytes Tamaño de escritura en bytes.
Delete
numAddedFiles Número de archivos agregados. No se proporciona cuando se eliminan las particiones de la tabla.
numRemovedFiles Número de archivos eliminados.
numDeletedRows Número de filas eliminadas. No se proporciona cuando se eliminan las particiones de la tabla.
numCopiedRows Número de filas copiadas en el proceso de eliminación de archivos.
executionTimeMs Tiempo que se necesitó para ejecutar toda la operación.
scanTimeMs Tiempo que se necesitó para examinar los archivos en busca de coincidencias.
rewriteTimeMs Tiempo que se necesitó para volver a escribir los archivos coincidentes.
TRUNCATE
numRemovedFiles Número de archivos eliminados.
executionTimeMs Tiempo que se necesitó para ejecutar toda la operación.
MERGE
numSourceRows Número de filas del DataFrame de origen.
numTargetRowsInserted Número de filas insertadas en la tabla de destino.
numTargetRowsUpdated Número de filas actualizadas en la tabla de destino.
numTargetRowsDeleted Número de filas eliminadas en la tabla de destino.
numTargetRowsCopied Número de filas de destino copiadas.
numOutputRows Número total de filas escritas.
numTargetFilesAdded Número de archivos agregados al receptor (destino).
numTargetFilesRemoved Número de archivos eliminados del receptor (destino).
executionTimeMs Tiempo que se necesitó para ejecutar toda la operación.
scanTimeMs Tiempo que se necesitó para examinar los archivos en busca de coincidencias.
rewriteTimeMs Tiempo que se necesitó para volver a escribir los archivos coincidentes.
UPDATE
numAddedFiles Número de archivos agregados.
numRemovedFiles Número de archivos eliminados.
numUpdatedRows Número de filas actualizadas.
numCopiedRows Número de filas que se acaban de copiar en el proceso de actualización de archivos.
executionTimeMs Tiempo que se necesitó para ejecutar toda la operación.
scanTimeMs Tiempo que se necesitó para examinar los archivos en busca de coincidencias.
rewriteTimeMs Tiempo que se necesitó para volver a escribir los archivos coincidentes.
FSCK numRemovedFiles Número de archivos eliminados.
CONVERT numConvertedFiles Número de archivos Parquet que se han convertido.
OPTIMIZE
numAddedFiles Número de archivos agregados.
numRemovedFiles Número de archivos optimizados.
numAddedBytes Número de bytes agregados después de optimizar la tabla.
numRemovedBytes Número de bytes eliminados.
minFileSize Tamaño del archivo más pequeño después de optimizar la tabla.
p25FileSize Tamaño del archivo del percentil 25 después de optimizar la tabla.
p50FileSize Tamaño medio del archivo después de optimizar la tabla.
p75FileSize Tamaño del archivo del percentil 75 después de optimizar la tabla.
maxFileSize Tamaño del archivo más grande después de optimizar la tabla.
CLONE
sourceTableSize Tamaño en bytes de la tabla de origen en la versión clonada.
sourceNumOfFiles Número de archivos de la tabla de origen en la versión clonada.
numRemovedFiles Número de archivos eliminados de la tabla de destino si se ha reemplazado una tabla delta anterior.
removedFilesSize Tamaño total en bytes de los archivos eliminados de la tabla de destino si se ha reemplazado una tabla delta anterior.
numCopiedFiles Número de archivos que se copiaron en la nueva ubicación. 0 para clones superficiales.
copiedFilesSize Tamaño total en bytes de los archivos que se copiaron en la nueva ubicación. 0 para clones superficiales.
RESTORE
tableSizeAfterRestore Tamaño de la tabla en bytes después de la restauración.
numOfFilesAfterRestore Número de archivos de la tabla después de la restauración.
numRemovedFiles Número de archivos eliminados por la operación de restauración.
numRestoredFiles Número de archivos que se agregaron como resultado de la restauración.
removedFilesSize Tamaño en bytes de los archivos eliminados por la restauración.
restoredFilesSize Tamaño en bytes de los archivos agregados por la restauración.
VACUUM
numDeletedFiles Número de archivos eliminados.
numVacuumedDirectories Número de directorios vaciados.
numFilesToDelete Número de archivos que se van a eliminar.

¿Qué es el viaje en el tiempo de Delta Lake?

El viaje en el tiempo de Delta Lake admite la consulta de versiones de tabla anteriores basadas en la marca de tiempo o la versión de tabla (como se registra en el registro de transacciones). Puede usar el viaje en el tiempo para aplicaciones como las siguientes:

  • Volver a crear análisis, informes o salidas (por ejemplo, la salida de un modelo de aprendizaje automático). Esto puede resultar útil para las depuraciones o las auditorías, en particular en los sectores regulados.
  • Escribir consultas temporales complejas.
  • Corregir errores en los datos.
  • Proporcionar aislamiento de instantáneas a un conjunto de consultas para tablas que cambian rápidamente.

Importante

Las versiones de tabla accesibles con el viaje en el tiempo se determinan mediante una combinación del umbral de retención para los archivos de registro de transacciones y la frecuencia y la retención especificada para las operaciones VACUUM. Si se ejecuta VACUUM diariamente con los valores predeterminados, hay 7 días de datos disponibles para el viaje en el tiempo.

Sintaxis del viaje en el tiempo de Delta

Para consultar una tabla Delta con viaje en el tiempo, agregue una cláusula después de la especificación del nombre de tabla.

  • El valor de timestamp_expression puede ser uno de los siguientes:
    • '2018-10-18T22:15:12.013Z', es decir, una cadena que se puede convertir en una marca de tiempo
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', es decir, una cadena de fecha.
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Cualquier otra expresión que sea una marca de tiempo o se pueda convertir en una
  • version es un valor largo que se puede obtener de la salida de DESCRIBE HISTORY table_spec.

timestamp_expression ni version pueden ser subconsultas.

Solo se aceptan cadenas de fecha o de hora. Por ejemplo, "2019-01-01" y "2019-01-01T00:00:00.000Z". Consulte el código siguiente para obtener una sintaxis de ejemplo:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).load("/tmp/delta/people10m")

También puede usar la sintaxis @ para especificar la marca de tiempo o la versión como parte del nombre de la tabla. La marca de tiempo debe estar en formato yyyyMMddHHmmssSSS. Puede especificar una versión después de @ si antepone v a la versión. Consulte el código siguiente para obtener una sintaxis de ejemplo:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

spark.read.load("/tmp/delta/people10m@20190101000000000")
spark.read.load("/tmp/delta/people10m@v123")

¿Qué son los puntos de comprobación del registro de transacciones?

Delta Lake registra las versiones de las tablas como archivos JSON dentro del directorio _delta_log, que se almacena junto con los datos de las tablas. Para optimizar la consulta de los puntos de control, Delta Lake agrega las versiones de las tablas a los archivos de puntos de comprobación de Parquet, lo que evita la necesidad de leer todas las versiones JSON del historial de las tablas. Azure Databricks optimiza la frecuencia de puntos de comprobación para el tamaño y la carga de trabajo de los datos. Los usuarios no deberían tener que interactuar directamente con los puntos de comprobación. La frecuencia de los puntos de comprobación está sujeta a cambios sin previo aviso.

Configuración de la retención de datos para las consultas de viaje en el tiempo

Para consultar una versión de tabla anterior, debe conservar tanto el registro como los archivos de datos de esa versión.

Los archivos de datos se eliminan cuando se ejecuta VACUUM en una tabla. Delta Lake administra la eliminación automática de archivos de registro después de realizar puntos de comprobación de las versiones de la tabla.

Dado que la mayoría de las tablas de Delta se han ejecutado con VACUUM con regularidad, las consultas a un momento dado deben respetar el umbral de retención de VACUUM, que es de 7 días de manera predeterminada.

Para aumentar el umbral de retención de datos para las tablas de Delta, debe configurar las siguientes propiedades de tabla:

  • delta.logRetentionDuration = "interval <interval>": controla cuánto tiempo se conserva el historial de una tabla. El valor predeterminado es interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": determina los usos del umbral VACUUM para quitar los archivos de datos a los que ya no se hace referencia en la versión de la tabla actual. El valor predeterminado es interval 7 days.

Puede especificar las propiedades Delta durante la creación de la tabla o establecerlas con una instrucción ALTER TABLE. Consulte Referencia de propiedades de tabla Delta.

Nota:

Debe establecer ambas propiedades para asegurarse de que el historial de tablas se conserva durante más tiempo para las tablas con operaciones VACUUM frecuentes. Por ejemplo, para acceder a 30 días de datos históricos, establezca delta.deletedFileRetentionDuration = "interval 30 days" (que coincide con la configuración predeterminada de delta.logRetentionDuration).

Aumentar el umbral de retención de datos puede hacer que los costes de almacenamiento aumenten, a medida que se mantienen más archivos de datos.

Restauración de una tabla Delta a un estado anterior

Nota:

Disponible en Databricks Runtime 7.4 y versiones posteriores.

Puede restaurar una tabla Delta a su estado anterior mediante el comando RESTORE. Una tabla Delta mantiene internamente versiones históricas de la tabla que permiten restaurarla a un estado anterior. El comando RESTORE admite como opciones una versión correspondiente al estado anterior o una marca de tiempo de cuándo se creó el estado anterior.

Importante

  • Puede restaurar una tabla ya restaurada.
  • Puede restaurar una tabla clonada.
  • Debe tener el permiso MODIFY en la tabla que se va a restaurar.
  • No puede restaurar una tabla a una versión anterior en la que los archivos de datos se eliminaron manualmente o mediante vacuum. La restauración a esta versión parcialmente sigue siendo posible si spark.sql.files.ignoreMissingFiles se establece en true.
  • El formato de marca de tiempo para restaurar a un estado anterior es yyyy-MM-dd HH:mm:ss. También se admite proporcionar solo una cadena de fecha (yyyy-MM-dd).
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

Para más detalles sobre la sintaxis, consulte RESTORE.

Importante

La restauración se considera una operación de cambio de datos. Las entradas de registro de Delta Lake agregadas por el comando RESTORE contienen dataChange establecido en true. Si hay una aplicación de bajada, como un trabajo de streaming estructurado que procesa las actualizaciones de una tabla de Delta Lake, las entradas del registro de cambios de datos agregadas por la operación de restauración se consideran nuevas actualizaciones de datos y su procesamiento puede dar lugar a datos duplicados.

Por ejemplo:

Versión de tabla Operación Actualizaciones de registro Delta Registros en las actualizaciones del registro de cambios de datos
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Viktor, age = 29, (name = George, age = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (name = George, age = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (No hay registros, ya que optimizar la compactación no cambia los datos de la tabla).
3 RESTORE(version=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

En el ejemplo anterior, el comando RESTORE da como resultado actualizaciones que ya se vieron al leer la versión 0 y 1 de la tabla Delta. Si una consulta de streaming leyó esta tabla, estos archivos se considerarán como datos recién agregados y se procesarán de nuevo.

Métricas de restauración

RESTORE informa de las métricas siguientes como un dataframe de una sola fila una vez completada la operación:

  • table_size_after_restore: tamaño de la tabla después de la restauración.

  • num_of_files_after_restore: número de archivos de la tabla después de la restauración.

  • num_removed_files: número de archivos quitados (eliminados lógicamente) de la tabla.

  • num_restored_files: número de archivos restaurados debido a la reversión.

  • removed_files_size: tamaño total en bytes de los archivos que se han quitado de la tabla.

  • restored_files_size: tamaño total en bytes de los archivos que se han restaurado.

    Restore metrics example

Ejemplos de uso del viaje en tiempo de Delta Lake

  • Corregir las eliminaciones accidentales en una tabla para el usuario 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Corregir las actualizaciones incorrectas accidentales de una tabla:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Consultar el número de clientes nuevos agregados durante la última semana.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

¿Cómo puedo encontrar la versión de la última confirmación en la sesión de Spark?

Para obtener el número de versión de la última confirmación escrita por el elemento SparkSession actual en todos los subprocesos y todas las tablas, consulte la configuración spark.databricks.delta.lastCommitVersionInSession de SQL.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Si no ha realizado ninguna confirmación por parte de SparkSession, se devuelve un valor vacío al consultar la clave.

Nota:

Si comparte el mismo elemento SparkSession entre varios subprocesos, es similar a compartir una variable entre varios subprocesos; puede encontrarse con condiciones de carrera porque el valor de configuración se actualiza simultáneamente.