Compartir vía


Trabajar con el historial de tablas

Cada operación que modifica una tabla crea una nueva versión de tabla. Use la información del historial para auditar las operaciones, revertir una tabla o consultar una tabla en un momento dado mediante el desplazamiento de tiempo.

Nota:

Databricks no recomienda usar el historial de tablas como una solución de copia de seguridad a largo plazo para el archivado de datos. Use solo los últimos 7 días para las operaciones de viaje de tiempo, a menos que haya establecido configuraciones de retención de datos y registros en un valor mayor.

Recuperación del historial de tablas

Recupere información que incluya las operaciones, el usuario y la marca de tiempo de cada escritura en una tabla mediante la ejecución del history comando . Las operaciones se devuelven en orden cronológico inverso.

La retención del historial de tablas viene determinada por la configuración de tabla logRetentionDuration, que es de 30 días de manera predeterminada.

Nota:

Los distintos umbrales de retención controlan el historial de tabla y el viaje en el tiempo. Consulta ¿Qué es el viaje en el tiempo?.

DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only

Para obtener más información sobre la sintaxis de Spark SQL, consulte DESCRIBE HISTORY.

Para obtener información sobre la sintaxis de Scala, Java y Python, consulte la Documentación de la API de Delta Lake.

El Explorador de catálogos proporciona una vista visual de esta información detallada de la tabla y el historial. Además del esquema de la tabla y los datos de ejemplo, puede hacer clic en la pestaña History (Historial) para ver el historial de la tabla que se muestra con DESCRIBE HISTORY.

Esquema del historial

La salida de la operación history tiene las columnas siguientes.

Columna Tipo Descripción
Versión largo Versión de tabla generada por la operación.
marca de tiempo marca de tiempo Cuándo se ha confirmado esta versión.
ID de usuario cuerda / cadena Identificador del usuario que ejecutó la operación.
nombre de usuario cuerda / cadena Nombre del usuario que ejecutó la operación.
operación cuerda / cadena Nombre de la operación.
parámetros de operación mapa Parámetros de la operación (por ejemplo, predicados).
trabajo Estructura Detalles del trabajo que ejecutó la operación.
cuaderno Estructura Detalles del cuaderno desde el que se ha ejecutado la operación.
clusterId cuerda / cadena Identificador del clúster en el que se ejecutó la operación.
versión de lectura largo Versión de la tabla que se leyó para realizar la operación de escritura.
nivel de aislamiento cuerda / cadena Nivel de aislamiento utilizado para esta operación.
isBlindAppend booleano Indica si esta operación ha anexado datos.
operationMetrics mapa Métricas de la operación (por ejemplo, el número de filas y archivos modificados).
metadatosDeUsuario cuerda / cadena Metadatos de confirmación definidos por el usuario, si se especificaron
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Nota:

Descripción partitionBy de los parámetros de operación

El partitionBy campo solo es significativo para las operaciones CREATE y OVERWRITE que definen o cambian el esquema de partición de una tabla.

Para las operaciones de anexión a tablas existentes (APPEND, INSERT, , UPDATEDELETE, MERGE), este campo podría mostrar una matriz [] vacía o columnas de partición en función del método de escritura usado (.save() frente .saveAsTable()a ). Esta incoherencia es el comportamiento esperado y no debe usarse para validar las escrituras.

Importante

No confíe en partitionBy el historial para validar las operaciones de anexión. El valor varía en función de los detalles de implementación, pero no afecta a cómo se escriben los datos en las particiones.

Example

Considere una tabla particionada por la columna date.

# Initial table creation - partitionBy is populated
df.write.format("delta") \
  .partitionBy("date") \
  .saveAsTable("sales_data")

La operación CREATE en el historial muestra:

operationParameters: {
  "mode": "ErrorIfExists",
  "partitionBy": "[\"date\"]"
}

Al anexar datos a esta tabla:

# Subsequent append - partitionBy shows empty
new_df.write.format("delta") \
  .mode("append") \
  .saveAsTable("sales_data")

La operación APPEND muestra:

operationParameters: {
  "mode": "Append",
  "partitionBy": "[]"
}

Se espera el valor vacío partitionBy. Los datos se siguen escribiendo en las particiones correctas según el esquema de partición existente de la tabla. Tenga en cuenta que .save() en una ruta de acceso puede mostrar columnas de partición en este campo, pero esta diferencia es un detalle de implementación y no afecta al comportamiento de escritura.

Métricas de operación

La operación history devuelve una colección de métricas de operaciones en la asignación de columnas operationMetrics.

En las tablas siguientes, se muestran las definiciones de clave de asignación por operación.

Operación Nombre de métrica Descripción
ESCRIBIR, CREATE TABLE COMO SELECT, SUSTITUIR TABLE COMO SELECT, COPY INTO
numFiles Número de archivos escritos.
numOutputBytes Tamaño en bytes del contenido escrito.
numOutputRows Número de filas escritas.
streaming UPDATE
numAddedFiles Número de archivos agregados.
numArchivosEliminados Número de archivos eliminados.
numOutputRows Número de filas escritas.
numOutputBytes Tamaño de escritura en bytes.
Delete
numAddedFiles Número de archivos agregados. No se proporciona cuando se eliminan las particiones de la tabla.
numArchivosEliminados Número de archivos eliminados.
numDeletedRows Número de filas eliminadas. No se proporciona cuando se eliminan las particiones de la tabla.
numCopiedRows Número de filas copiadas en el proceso de eliminación de archivos.
executionTimeMs Tiempo que se necesitó para ejecutar toda la operación.
scanTimeMs Tiempo que se necesitó para examinar los archivos en busca de coincidencias.
rewriteTimeMs Tiempo que se necesitó para volver a escribir los archivos coincidentes.
TRUNCAR
numArchivosEliminados Número de archivos eliminados.
executionTimeMs Tiempo que se necesitó para ejecutar toda la operación.
FUSIÓN
numSourceRows Número de filas del DataFrame de origen.
numTargetRowsInserted Número de filas insertadas en la tabla de destino.
numTargetRowsUpdated Número de filas actualizadas en la tabla de destino.
numFilasObjetivoEliminadas Número de filas eliminadas en la tabla de destino.
numTargetRowsCopied Número de filas de destino copiadas.
numOutputRows Número total de filas escritas.
numArchivosObjetivoAñadidos Número de archivos agregados al receptor (destino).
númeroDeArchivosObjetivoEliminados Número de archivos eliminados del receptor (destino).
executionTimeMs Tiempo que se necesitó para ejecutar toda la operación.
scanTimeMs Tiempo que se necesitó para examinar los archivos en busca de coincidencias.
rewriteTimeMs Tiempo que se necesitó para volver a escribir los archivos coincidentes.
UPDATE
numAddedFiles Número de archivos agregados.
numArchivosEliminados Número de archivos eliminados.
numUpdatedRows Número de filas actualizadas.
numCopiedRows Número de filas que se acaban de copiar en el proceso de actualización de archivos.
executionTimeMs Tiempo que se necesitó para ejecutar toda la operación.
scanTimeMs Tiempo que se necesitó para examinar los archivos en busca de coincidencias.
rewriteTimeMs Tiempo que se necesitó para volver a escribir los archivos coincidentes.
FSCK numArchivosEliminados Número de archivos eliminados.
CONVERTIR numConvertedFiles Número de archivos Parquet que se han convertido.
OPTIMIZE
numAddedFiles Número de archivos agregados.
numArchivosEliminados Número de archivos optimizados.
numAddedBytes Número de bytes agregados después de optimizar la tabla.
NúmeroDeBytesEliminados Número de bytes eliminados.
minFileSize Tamaño del archivo más pequeño después de optimizar la tabla.
p25FileSize Tamaño del archivo del percentil 25 después de optimizar la tabla.
p50FileSize Tamaño medio del archivo después de optimizar la tabla.
p75TamañoDeArchivo Tamaño del archivo del percentil 75 después de optimizar la tabla.
tamaño máximo de archivo Tamaño del archivo más grande después de optimizar la tabla.
CLON
sourceTableSize Tamaño en bytes de la tabla de origen en la versión que se clona.
sourceNumOfFiles Número de archivos de la tabla de origen en la versión que se clona.
numArchivosEliminados Número de archivos quitados de la tabla de destino si se ha reemplazado una tabla anterior.
removedFilesSize Tamaño total en bytes de los archivos quitados de la tabla de destino si se reemplazó una tabla anterior.
númeroDeArchivosCopiados Número de archivos que se copiaron en la nueva ubicación. 0 para clones superficiales.
copiedFilesSize Tamaño total en bytes de los archivos que se copiaron en la nueva ubicación. 0 para clones superficiales.
RESTORE
tamañoDeTablaDespuésDeRestaurar Tamaño de la tabla en bytes después de la restauración.
numArchivosDespuésDeRestaurar Número de archivos de la tabla después de la restauración.
numArchivosEliminados Número de archivos eliminados por la operación de restauración.
numRestoredFiles Número de archivos que se agregaron como resultado de la restauración.
removedFilesSize Tamaño en bytes de los archivos eliminados por la restauración.
tamañoDeArchivosRestaurados Tamaño en bytes de los archivos agregados por la restauración.
VACUUM
cantidadDeArchivosEliminados Número de archivos eliminados.
numVacuumedDirectories Número de directorios vaciados.
númeroDeArchivosParaEliminar Número de archivos que se van a eliminar.

¿Qué es el viaje en el tiempo?

El viaje en el tiempo admite la consulta de versiones anteriores de tablas basadas en la marca de tiempo o en la versión de la tabla, según se registra en el registro de transacciones. Puede usar el viaje en el tiempo para aplicaciones como las siguientes:

  • Volver a crear análisis, informes o salidas (por ejemplo, la salida de un modelo de aprendizaje automático). Esto puede resultar útil para las depuraciones o las auditorías, en particular en los sectores regulados.
  • Escribir consultas temporales complejas.
  • Corregir errores en los datos.
  • Proporcionar aislamiento de instantáneas a un conjunto de consultas para tablas que cambian rápidamente.

Importante

En Databricks Runtime 18.0 y versiones posteriores, las consultas de viaje en el tiempo se bloquean si solicitan una versión anterior a la propiedad de la tabla deletedFileRetentionDuration, con un valor predeterminado de 7 días. En el caso de las tablas administradas por el catálogo de Unity, esto se aplica a Databricks Runtime 12.2 y versiones posteriores.

Sintaxis de viaje en el tiempo

Para consultar una tabla con desplazamiento de tiempo, agregue una cláusula después de la especificación de nombre de tabla.

  • El valor de timestamp_expression puede ser uno de los siguientes:
    • '2018-10-18T22:15:12.013Z', es decir, una cadena que se puede convertir en una marca de tiempo
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', es decir, una cadena de fecha.
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Cualquier otra expresión que sea una marca de tiempo o se pueda convertir en una
  • version es un valor largo que se puede obtener de la salida de DESCRIBE HISTORY table_spec.

timestamp_expression ni version pueden ser subconsultas.

Solo se aceptan cadenas de fecha o de hora. Por ejemplo, "2019-01-01" y "2019-01-01T00:00:00.000Z". Consulte el código siguiente para obtener una sintaxis de ejemplo:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

Pitón

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")

También puede usar la sintaxis @ para especificar la marca de tiempo o la versión como parte del nombre de la tabla. La marca de tiempo debe estar en formato yyyyMMddHHmmssSSS. Puede especificar una versión después de @ si antepone v a la versión. Consulte el código siguiente para obtener una sintaxis de ejemplo:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Pitón

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

¿Qué son los puntos de comprobación del registro de transacciones?

Las versiones de tabla se registran como archivos JSON dentro del directorio del registro de transacciones, que se almacena junto con los datos de la tabla. Para optimizar la consulta de puntos de comprobación, las versiones de tabla se agregan a los archivos de punto de comprobación de Parquet, lo que impide la necesidad de leer todas las versiones JSON del historial de tablas. Azure Databricks optimiza la frecuencia de puntos de comprobación para el tamaño y la carga de trabajo de los datos. Los usuarios no deberían tener que interactuar directamente con los puntos de comprobación. La frecuencia de los puntos de comprobación está sujeta a cambios sin previo aviso.

Configuración de la retención de datos para las consultas de viaje en el tiempo

Para consultar una versión de tabla anterior, debe conservar tanto el registro como los archivos de datos de esa versión.

Los archivos de datos se eliminan cuando se ejecuta VACUUM en una tabla. La eliminación de archivos de registro se administra automáticamente después del establecimiento de puntos de control de las versiones de la tabla.

Dado que la mayoría de las tablas tienen procesos VACUUM ejecutados sobre ellas con regularidad, las consultas en un momento dado deben respetar el umbral de retención para VACUUM, que es de 7 días de forma predeterminada.

Para aumentar el umbral de retención de datos para las tablas, debe configurar las siguientes propiedades de tabla:

  • delta.logRetentionDuration = "interval <interval>": controla cuánto tiempo se conserva el historial de una tabla. El valor predeterminado es interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": determina los usos del umbral VACUUM para quitar los archivos de datos a los que ya no se hace referencia en la versión de la tabla actual. El valor predeterminado es interval 7 days.

Puede especificar propiedades de tabla durante la creación de tablas o establecerlas con una ALTER TABLE instrucción . Consulte Referencia de propiedades de tabla.

Nota:

En Databricks Runtime 18.0 y versiones posteriores, logRetentionDuration debe ser mayor o igual que deletedFileRetentionDuration. En el caso de las tablas administradas por el catálogo de Unity, esto se aplica a Databricks Runtime 12.2 y versiones posteriores.

Para acceder a 30 días de datos históricos, establezca delta.deletedFileRetentionDuration = "interval 30 days" (que coincide con la configuración predeterminada de delta.logRetentionDuration).

Aumentar el umbral de retención de datos puede hacer que los costes de almacenamiento aumenten, a medida que se mantienen más archivos de datos.

Restauración de una tabla a un estado anterior

Puede restaurar una tabla a su estado anterior mediante el RESTORE comando . Las tablas mantienen internamente versiones históricas que permiten restaurarlas a un estado anterior. El comando RESTORE admite como opciones una versión correspondiente al estado anterior o una marca de tiempo de cuándo se creó el estado anterior.

Importante

  • Puede restaurar una tabla ya restaurada.
  • Puede restaurar una tabla clonada.
  • Debe tener el permiso MODIFY en la tabla que se va a restaurar.
  • No puede restaurar una tabla a una versión anterior en la que los archivos de datos se eliminaron manualmente o mediante vacuum. La restauración a esta versión parcialmente sigue siendo posible si spark.sql.files.ignoreMissingFiles se establece en true.
  • El formato de marca de tiempo para restaurar a un estado anterior es yyyy-MM-dd HH:mm:ss. También se admite proporcionar solo una cadena de fecha (yyyy-MM-dd).
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

Para obtener más información sobre la sintaxis, consulte RESTORE.

Importante

La restauración se considera una operación de cambio de datos. Las entradas de registro agregadas por el RESTORE comando contienen dataChange establecido en true. Si hay una aplicación downstream, como un trabajo de streaming estructurado que procesa las actualizaciones de una tabla, las entradas del log de cambios de datos agregadas por la operación de restauración se consideran nuevas actualizaciones de datos y su procesamiento puede dar lugar a datos duplicados.

Por ejemplo:

Versión de tabla Operación Actualizaciones de registro Registros en las actualizaciones del registro de cambios de datos
0 INSERT AddFile(/path/to/file-1, dataChange = true) (nombre = Viktor, edad = 29, (nombre = George, edad = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (name = George, age = 39)
2 OPTIMIZE AgregarArchivo(/path/to/file-3, dataChange = false), EliminarArchivo(/path/to/file-1), EliminarArchivo(/path/to/file-2) (No hay registros, ya que optimizar la compactación no cambia los datos de la tabla).
3 RESTORE(versión=1) RemoveFile(/ruta/al/archivo-3), AddFile(/ruta/al/archivo-1, cambioDeDatos = verdadero), AddFile(/ruta/al/archivo-2, cambioDeDatos = verdadero) (nombre = Viktor, edad = 29), (nombre = George, edad = 55), (nombre = George, edad = 39)

En el ejemplo anterior, el RESTORE comando da como resultado actualizaciones que ya se vieron al leer la versión 0 y 1 de la tabla. Si una consulta de streaming leyó esta tabla, estos archivos se considerarán como datos recién agregados y se procesarán de nuevo.

Métricas de restauración

RESTORE informa de las métricas siguientes como un dataframe de una sola fila una vez completada la operación:

  • table_size_after_restore: tamaño de la tabla después de la restauración.

  • num_of_files_after_restore: número de archivos de la tabla después de la restauración.

  • num_removed_files: número de archivos quitados (eliminados lógicamente) de la tabla.

  • num_restored_files: número de archivos restaurados debido a la reversión.

  • removed_files_size: tamaño total en bytes de los archivos que se han quitado de la tabla.

  • restored_files_size: tamaño total en bytes de los archivos que se han restaurado.

    Ejemplo de métricas de restauración

Ejemplos de uso del viaje en el tiempo

  • Corregir las eliminaciones accidentales en una tabla para el usuario 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Corregir las actualizaciones incorrectas accidentales de una tabla:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Consultar el número de clientes nuevos agregados durante la última semana.

    SELECT
    (
      SELECT count(distinct userId)
      FROM my_table
    )
    -
    (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7)
    ) AS new_customers
    

¿Cómo encuentro la última versión de confirmación en la sesión de Spark?

Para obtener el número de versión de la última confirmación escrita por el elemento SparkSession actual en todos los subprocesos y todas las tablas, consulte la configuración spark.databricks.delta.lastCommitVersionInSession de SQL.

Nota:

En el caso de las tablas de Apache Iceberg, use spark.databricks.iceberg.lastCommitVersionInSession en lugar de spark.databricks.delta.lastCommitVersionInSession.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Pitón

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Si no ha realizado ninguna confirmación por parte de SparkSession, se devuelve un valor vacío al consultar la clave.

Nota:

Si comparte lo mismo SparkSession en varios subprocesos, es similar a compartir una variable entre varios subprocesos; puede alcanzar condiciones de carrera a medida que el valor de configuración se actualiza simultáneamente.