Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Cada operación que modifica una tabla crea una nueva versión de tabla. Use la información del historial para auditar las operaciones, revertir una tabla o consultar una tabla en un momento dado mediante el desplazamiento de tiempo.
Nota:
Databricks no recomienda usar el historial de tablas como una solución de copia de seguridad a largo plazo para el archivado de datos. Use solo los últimos 7 días para las operaciones de viaje de tiempo, a menos que haya establecido configuraciones de retención de datos y registros en un valor mayor.
Recuperación del historial de tablas
Recupere información que incluya las operaciones, el usuario y la marca de tiempo de cada escritura en una tabla mediante la ejecución del history comando . Las operaciones se devuelven en orden cronológico inverso.
La retención del historial de tablas viene determinada por la configuración de tabla logRetentionDuration, que es de 30 días de manera predeterminada.
Nota:
Los distintos umbrales de retención controlan el historial de tabla y el viaje en el tiempo. Consulta ¿Qué es el viaje en el tiempo?.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Para obtener más información sobre la sintaxis de Spark SQL, consulte DESCRIBE HISTORY.
Para obtener información sobre la sintaxis de Scala, Java y Python, consulte la Documentación de la API de Delta Lake.
El Explorador de catálogos proporciona una vista visual de esta información detallada de la tabla y el historial. Además del esquema de la tabla y los datos de ejemplo, puede hacer clic en la pestaña History (Historial) para ver el historial de la tabla que se muestra con DESCRIBE HISTORY.
Esquema del historial
La salida de la operación history tiene las columnas siguientes.
| Columna | Tipo | Descripción |
|---|---|---|
| Versión | largo | Versión de tabla generada por la operación. |
| marca de tiempo | marca de tiempo | Cuándo se ha confirmado esta versión. |
| ID de usuario | cuerda / cadena | Identificador del usuario que ejecutó la operación. |
| nombre de usuario | cuerda / cadena | Nombre del usuario que ejecutó la operación. |
| operación | cuerda / cadena | Nombre de la operación. |
| parámetros de operación | mapa | Parámetros de la operación (por ejemplo, predicados). |
| trabajo | Estructura | Detalles del trabajo que ejecutó la operación. |
| cuaderno | Estructura | Detalles del cuaderno desde el que se ha ejecutado la operación. |
| clusterId | cuerda / cadena | Identificador del clúster en el que se ejecutó la operación. |
| versión de lectura | largo | Versión de la tabla que se leyó para realizar la operación de escritura. |
| nivel de aislamiento | cuerda / cadena | Nivel de aislamiento utilizado para esta operación. |
| isBlindAppend | booleano | Indica si esta operación ha anexado datos. |
| operationMetrics | mapa | Métricas de la operación (por ejemplo, el número de filas y archivos modificados). |
| metadatosDeUsuario | cuerda / cadena | Metadatos de confirmación definidos por el usuario, si se especificaron |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Nota:
- Algunas de las otras columnas no están disponibles si escribe en una tabla mediante los métodos siguientes:
- Las columnas agregadas en el futuro siempre se agregarán después de la última columna.
Descripción partitionBy de los parámetros de operación
El partitionBy campo solo es significativo para las operaciones CREATE y OVERWRITE que definen o cambian el esquema de partición de una tabla.
Para las operaciones de anexión a tablas existentes (APPEND, INSERT, , UPDATEDELETE, MERGE), este campo podría mostrar una matriz [] vacía o columnas de partición en función del método de escritura usado (.save() frente .saveAsTable()a ). Esta incoherencia es el comportamiento esperado y no debe usarse para validar las escrituras.
Importante
No confíe en partitionBy el historial para validar las operaciones de anexión. El valor varía en función de los detalles de implementación, pero no afecta a cómo se escriben los datos en las particiones.
Example
Considere una tabla particionada por la columna date.
# Initial table creation - partitionBy is populated
df.write.format("delta") \
.partitionBy("date") \
.saveAsTable("sales_data")
La operación CREATE en el historial muestra:
operationParameters: {
"mode": "ErrorIfExists",
"partitionBy": "[\"date\"]"
}
Al anexar datos a esta tabla:
# Subsequent append - partitionBy shows empty
new_df.write.format("delta") \
.mode("append") \
.saveAsTable("sales_data")
La operación APPEND muestra:
operationParameters: {
"mode": "Append",
"partitionBy": "[]"
}
Se espera el valor vacío partitionBy. Los datos se siguen escribiendo en las particiones correctas según el esquema de partición existente de la tabla. Tenga en cuenta que .save() en una ruta de acceso puede mostrar columnas de partición en este campo, pero esta diferencia es un detalle de implementación y no afecta al comportamiento de escritura.
Métricas de operación
La operación history devuelve una colección de métricas de operaciones en la asignación de columnas operationMetrics.
En las tablas siguientes, se muestran las definiciones de clave de asignación por operación.
| Operación | Nombre de métrica | Descripción |
|---|---|---|
| ESCRIBIR, CREATE TABLE COMO SELECT, SUSTITUIR TABLE COMO SELECT, COPY INTO | ||
| numFiles | Número de archivos escritos. | |
| numOutputBytes | Tamaño en bytes del contenido escrito. | |
| numOutputRows | Número de filas escritas. | |
| streaming UPDATE | ||
| numAddedFiles | Número de archivos agregados. | |
| numArchivosEliminados | Número de archivos eliminados. | |
| numOutputRows | Número de filas escritas. | |
| numOutputBytes | Tamaño de escritura en bytes. | |
| Delete | ||
| numAddedFiles | Número de archivos agregados. No se proporciona cuando se eliminan las particiones de la tabla. | |
| numArchivosEliminados | Número de archivos eliminados. | |
| numDeletedRows | Número de filas eliminadas. No se proporciona cuando se eliminan las particiones de la tabla. | |
| numCopiedRows | Número de filas copiadas en el proceso de eliminación de archivos. | |
| executionTimeMs | Tiempo que se necesitó para ejecutar toda la operación. | |
| scanTimeMs | Tiempo que se necesitó para examinar los archivos en busca de coincidencias. | |
| rewriteTimeMs | Tiempo que se necesitó para volver a escribir los archivos coincidentes. | |
| TRUNCAR | ||
| numArchivosEliminados | Número de archivos eliminados. | |
| executionTimeMs | Tiempo que se necesitó para ejecutar toda la operación. | |
| FUSIÓN | ||
| numSourceRows | Número de filas del DataFrame de origen. | |
| numTargetRowsInserted | Número de filas insertadas en la tabla de destino. | |
| numTargetRowsUpdated | Número de filas actualizadas en la tabla de destino. | |
| numFilasObjetivoEliminadas | Número de filas eliminadas en la tabla de destino. | |
| numTargetRowsCopied | Número de filas de destino copiadas. | |
| numOutputRows | Número total de filas escritas. | |
| numArchivosObjetivoAñadidos | Número de archivos agregados al receptor (destino). | |
| númeroDeArchivosObjetivoEliminados | Número de archivos eliminados del receptor (destino). | |
| executionTimeMs | Tiempo que se necesitó para ejecutar toda la operación. | |
| scanTimeMs | Tiempo que se necesitó para examinar los archivos en busca de coincidencias. | |
| rewriteTimeMs | Tiempo que se necesitó para volver a escribir los archivos coincidentes. | |
| UPDATE | ||
| numAddedFiles | Número de archivos agregados. | |
| numArchivosEliminados | Número de archivos eliminados. | |
| numUpdatedRows | Número de filas actualizadas. | |
| numCopiedRows | Número de filas que se acaban de copiar en el proceso de actualización de archivos. | |
| executionTimeMs | Tiempo que se necesitó para ejecutar toda la operación. | |
| scanTimeMs | Tiempo que se necesitó para examinar los archivos en busca de coincidencias. | |
| rewriteTimeMs | Tiempo que se necesitó para volver a escribir los archivos coincidentes. | |
| FSCK | numArchivosEliminados | Número de archivos eliminados. |
| CONVERTIR | numConvertedFiles | Número de archivos Parquet que se han convertido. |
| OPTIMIZE | ||
| numAddedFiles | Número de archivos agregados. | |
| numArchivosEliminados | Número de archivos optimizados. | |
| numAddedBytes | Número de bytes agregados después de optimizar la tabla. | |
| NúmeroDeBytesEliminados | Número de bytes eliminados. | |
| minFileSize | Tamaño del archivo más pequeño después de optimizar la tabla. | |
| p25FileSize | Tamaño del archivo del percentil 25 después de optimizar la tabla. | |
| p50FileSize | Tamaño medio del archivo después de optimizar la tabla. | |
| p75TamañoDeArchivo | Tamaño del archivo del percentil 75 después de optimizar la tabla. | |
| tamaño máximo de archivo | Tamaño del archivo más grande después de optimizar la tabla. | |
| CLON | ||
| sourceTableSize | Tamaño en bytes de la tabla de origen en la versión que se clona. | |
| sourceNumOfFiles | Número de archivos de la tabla de origen en la versión que se clona. | |
| numArchivosEliminados | Número de archivos quitados de la tabla de destino si se ha reemplazado una tabla anterior. | |
| removedFilesSize | Tamaño total en bytes de los archivos quitados de la tabla de destino si se reemplazó una tabla anterior. | |
| númeroDeArchivosCopiados | Número de archivos que se copiaron en la nueva ubicación. 0 para clones superficiales. | |
| copiedFilesSize | Tamaño total en bytes de los archivos que se copiaron en la nueva ubicación. 0 para clones superficiales. | |
| RESTORE | ||
| tamañoDeTablaDespuésDeRestaurar | Tamaño de la tabla en bytes después de la restauración. | |
| numArchivosDespuésDeRestaurar | Número de archivos de la tabla después de la restauración. | |
| numArchivosEliminados | Número de archivos eliminados por la operación de restauración. | |
| numRestoredFiles | Número de archivos que se agregaron como resultado de la restauración. | |
| removedFilesSize | Tamaño en bytes de los archivos eliminados por la restauración. | |
| tamañoDeArchivosRestaurados | Tamaño en bytes de los archivos agregados por la restauración. | |
| VACUUM | ||
| cantidadDeArchivosEliminados | Número de archivos eliminados. | |
| numVacuumedDirectories | Número de directorios vaciados. | |
| númeroDeArchivosParaEliminar | Número de archivos que se van a eliminar. |
¿Qué es el viaje en el tiempo?
El viaje en el tiempo admite la consulta de versiones anteriores de tablas basadas en la marca de tiempo o en la versión de la tabla, según se registra en el registro de transacciones. Puede usar el viaje en el tiempo para aplicaciones como las siguientes:
- Volver a crear análisis, informes o salidas (por ejemplo, la salida de un modelo de aprendizaje automático). Esto puede resultar útil para las depuraciones o las auditorías, en particular en los sectores regulados.
- Escribir consultas temporales complejas.
- Corregir errores en los datos.
- Proporcionar aislamiento de instantáneas a un conjunto de consultas para tablas que cambian rápidamente.
Importante
En Databricks Runtime 18.0 y versiones posteriores, las consultas de viaje en el tiempo se bloquean si solicitan una versión anterior a la propiedad de la tabla deletedFileRetentionDuration, con un valor predeterminado de 7 días. En el caso de las tablas administradas por el catálogo de Unity, esto se aplica a Databricks Runtime 12.2 y versiones posteriores.
Sintaxis de viaje en el tiempo
Para consultar una tabla con desplazamiento de tiempo, agregue una cláusula después de la especificación de nombre de tabla.
- El valor de
timestamp_expressionpuede ser uno de los siguientes:-
'2018-10-18T22:15:12.013Z', es decir, una cadena que se puede convertir en una marca de tiempo cast('2018-10-18 13:36:32 CEST' as timestamp)-
'2018-10-18', es decir, una cadena de fecha. current_timestamp() - interval 12 hoursdate_sub(current_date(), 1)- Cualquier otra expresión que sea una marca de tiempo o se pueda convertir en una
-
-
versiones un valor largo que se puede obtener de la salida deDESCRIBE HISTORY table_spec.
timestamp_expression ni version pueden ser subconsultas.
Solo se aceptan cadenas de fecha o de hora. Por ejemplo, "2019-01-01" y "2019-01-01T00:00:00.000Z". Consulte el código siguiente para obtener una sintaxis de ejemplo:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Pitón
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
También puede usar la sintaxis @ para especificar la marca de tiempo o la versión como parte del nombre de la tabla. La marca de tiempo debe estar en formato yyyyMMddHHmmssSSS. Puede especificar una versión después de @ si antepone v a la versión. Consulte el código siguiente para obtener una sintaxis de ejemplo:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Pitón
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
¿Qué son los puntos de comprobación del registro de transacciones?
Las versiones de tabla se registran como archivos JSON dentro del directorio del registro de transacciones, que se almacena junto con los datos de la tabla. Para optimizar la consulta de puntos de comprobación, las versiones de tabla se agregan a los archivos de punto de comprobación de Parquet, lo que impide la necesidad de leer todas las versiones JSON del historial de tablas. Azure Databricks optimiza la frecuencia de puntos de comprobación para el tamaño y la carga de trabajo de los datos. Los usuarios no deberían tener que interactuar directamente con los puntos de comprobación. La frecuencia de los puntos de comprobación está sujeta a cambios sin previo aviso.
Configuración de la retención de datos para las consultas de viaje en el tiempo
Para consultar una versión de tabla anterior, debe conservar tanto el registro como los archivos de datos de esa versión.
Los archivos de datos se eliminan cuando se ejecuta VACUUM en una tabla. La eliminación de archivos de registro se administra automáticamente después del establecimiento de puntos de control de las versiones de la tabla.
Dado que la mayoría de las tablas tienen procesos VACUUM ejecutados sobre ellas con regularidad, las consultas en un momento dado deben respetar el umbral de retención para VACUUM, que es de 7 días de forma predeterminada.
Para aumentar el umbral de retención de datos para las tablas, debe configurar las siguientes propiedades de tabla:
-
delta.logRetentionDuration = "interval <interval>": controla cuánto tiempo se conserva el historial de una tabla. El valor predeterminado esinterval 30 days. -
delta.deletedFileRetentionDuration = "interval <interval>": determina los usos del umbralVACUUMpara quitar los archivos de datos a los que ya no se hace referencia en la versión de la tabla actual. El valor predeterminado esinterval 7 days.
Puede especificar propiedades de tabla durante la creación de tablas o establecerlas con una ALTER TABLE instrucción . Consulte Referencia de propiedades de tabla.
Nota:
En Databricks Runtime 18.0 y versiones posteriores, logRetentionDuration debe ser mayor o igual que deletedFileRetentionDuration. En el caso de las tablas administradas por el catálogo de Unity, esto se aplica a Databricks Runtime 12.2 y versiones posteriores.
Para acceder a 30 días de datos históricos, establezca delta.deletedFileRetentionDuration = "interval 30 days" (que coincide con la configuración predeterminada de delta.logRetentionDuration).
Aumentar el umbral de retención de datos puede hacer que los costes de almacenamiento aumenten, a medida que se mantienen más archivos de datos.
Restauración de una tabla a un estado anterior
Puede restaurar una tabla a su estado anterior mediante el RESTORE comando . Las tablas mantienen internamente versiones históricas que permiten restaurarlas a un estado anterior.
El comando RESTORE admite como opciones una versión correspondiente al estado anterior o una marca de tiempo de cuándo se creó el estado anterior.
Importante
- Puede restaurar una tabla ya restaurada.
- Puede restaurar una tabla clonada.
- Debe tener el permiso
MODIFYen la tabla que se va a restaurar. - No puede restaurar una tabla a una versión anterior en la que los archivos de datos se eliminaron manualmente o mediante
vacuum. La restauración a esta versión parcialmente sigue siendo posible sispark.sql.files.ignoreMissingFilesse establece entrue. - El formato de marca de tiempo para restaurar a un estado anterior es
yyyy-MM-dd HH:mm:ss. También se admite proporcionar solo una cadena de fecha (yyyy-MM-dd).
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Para obtener más información sobre la sintaxis, consulte RESTORE.
Importante
La restauración se considera una operación de cambio de datos. Las entradas de registro agregadas por el RESTORE comando contienen dataChange establecido en true. Si hay una aplicación downstream, como un trabajo de streaming estructurado que procesa las actualizaciones de una tabla, las entradas del log de cambios de datos agregadas por la operación de restauración se consideran nuevas actualizaciones de datos y su procesamiento puede dar lugar a datos duplicados.
Por ejemplo:
| Versión de tabla | Operación | Actualizaciones de registro | Registros en las actualizaciones del registro de cambios de datos |
|---|---|---|---|
| 0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (nombre = Viktor, edad = 29, (nombre = George, edad = 55) |
| 1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (name = George, age = 39) |
| 2 | OPTIMIZE | AgregarArchivo(/path/to/file-3, dataChange = false), EliminarArchivo(/path/to/file-1), EliminarArchivo(/path/to/file-2) | (No hay registros, ya que optimizar la compactación no cambia los datos de la tabla). |
| 3 | RESTORE(versión=1) | RemoveFile(/ruta/al/archivo-3), AddFile(/ruta/al/archivo-1, cambioDeDatos = verdadero), AddFile(/ruta/al/archivo-2, cambioDeDatos = verdadero) | (nombre = Viktor, edad = 29), (nombre = George, edad = 55), (nombre = George, edad = 39) |
En el ejemplo anterior, el RESTORE comando da como resultado actualizaciones que ya se vieron al leer la versión 0 y 1 de la tabla. Si una consulta de streaming leyó esta tabla, estos archivos se considerarán como datos recién agregados y se procesarán de nuevo.
Métricas de restauración
RESTORE informa de las métricas siguientes como un dataframe de una sola fila una vez completada la operación:
table_size_after_restore: tamaño de la tabla después de la restauración.num_of_files_after_restore: número de archivos de la tabla después de la restauración.num_removed_files: número de archivos quitados (eliminados lógicamente) de la tabla.num_restored_files: número de archivos restaurados debido a la reversión.removed_files_size: tamaño total en bytes de los archivos que se han quitado de la tabla.restored_files_size: tamaño total en bytes de los archivos que se han restaurado.
Ejemplos de uso del viaje en el tiempo
Corregir las eliminaciones accidentales en una tabla para el usuario
111:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111Corregir las actualizaciones incorrectas accidentales de una tabla:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *Consultar el número de clientes nuevos agregados durante la última semana.
SELECT ( SELECT count(distinct userId) FROM my_table ) - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7) ) AS new_customers
¿Cómo encuentro la última versión de confirmación en la sesión de Spark?
Para obtener el número de versión de la última confirmación escrita por el elemento SparkSession actual en todos los subprocesos y todas las tablas, consulte la configuración spark.databricks.delta.lastCommitVersionInSession de SQL.
Nota:
En el caso de las tablas de Apache Iceberg, use spark.databricks.iceberg.lastCommitVersionInSession en lugar de spark.databricks.delta.lastCommitVersionInSession.
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Pitón
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Si no ha realizado ninguna confirmación por parte de SparkSession, se devuelve un valor vacío al consultar la clave.
Nota:
Si comparte lo mismo SparkSession en varios subprocesos, es similar a compartir una variable entre varios subprocesos; puede alcanzar condiciones de carrera a medida que el valor de configuración se actualiza simultáneamente.