Работа с журналом таблиц Delta Lake
Каждая операция, которая изменяет таблицу Delta Lake, создает новую версию таблицы. Сведения журнала можно использовать для аудита операций, отката таблицы или запроса таблицы в определенный момент времени с помощью перемещения по времени.
Примечание.
Databricks не рекомендует использовать журнал таблиц Delta Lake в качестве долгосрочного решения резервного копирования для архивации данных. Databricks рекомендует использовать только последние 7 дней для операций перехода по времени, если только вы не установили большее значение для конфигураций хранения данных и журналов.
Получение журнала таблицы Delta
Вы можете получить сведения, включая операции, пользователя и метку времени для каждой записи в таблицу Delta, выполнив history
команду. Операции возвращаются в обратном хронологическом порядке.
Хранение журнала таблиц определяется параметром delta.logRetentionDuration
таблицы, который составляет 30 дней по умолчанию.
Примечание.
Журнал перехода по времени и таблиц управляется различными пороговыми значения для хранения. См. статью Что такое переход по времени Delta Lake?.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Сведения о синтаксисе Spark SQL см. в разделе "ОПИСАНИЕ ЖУРНАЛА".
Дополнительные сведения о синтаксисе Scala/Java/Python см. в документации по API Delta Lake.
Обозреватель каталогов предоставляет визуальное представление этой подробной информации о таблице и журнале для таблиц Delta. Помимо схемы таблицы и выборки данных можно также щелкнуть вкладку Журнал, чтобы просмотреть журнал таблицы, который отображается с DESCRIBE HISTORY
.
Схема журнала
Выходные данные операции history
имеют указанніе ниже столбцы.
Column | Type | Описание |
---|---|---|
версия | длинный | Версия таблицы, созданная операцией. |
TIMESTAMP | Метка времени | Когда версия была зафиксирована. |
userId | строка | Идентификатор пользователя, запустившего операцию. |
userName | строка | Имя пользователя, запустившего операцию. |
Операция | строка | Имя операции. |
operationParameters | map | Параметры операции (например, предикаты). |
задание | struct | Сведения о задании, которое запустило операцию. |
записная книжка | struct | Сведения о записной книжке, из которой выполнялась операция. |
clusterId | строка | Идентификатор кластера, в котором выполнялась операция. |
readVersion | длинный | Версия таблицы, которая была считана для выполнения операции записи. |
isolationLevel | строка | Уровень изоляции, использованный для этой операции. |
isBlindAppend | boolean | Выплнялось ли добавление данных в ходе этой операции. |
operationMetrics | map | Метрики операции (например, число измененных строк и файлов). |
userMetadata | строка | Определяемые пользователем метаданные фиксации, если они были указаны. |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Примечание.
- Некоторые из других столбцов недоступны, если вы записываете в таблицу Delta с помощью следующих методов:
- Столбцы, добавленные позже, всегда будут добавляться после последнего столбца.
Ключевые метрики операции
Операция history
возвращает коллекцию метрик операции в сопоставлении столбцов operationMetrics
.
В следующих таблицах перечислены ключевые определения по операции.
Операция | Имя метрики | Description |
---|---|---|
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO | ||
numFiles | Число записанных файлов. | |
numOutputBytes | Размер записанного содержимого в байтах. | |
numOutputRows | Число записанных строк. | |
ПОТОКОВАЯ ПЕРЕДАЧА ОБНОВЛЕНИЯ | ||
numAddedFiles | Число добавленных файлов | |
numRemovedFiles | Число удаленных файлов. | |
numOutputRows | Число записанных строк. | |
numOutputBytes | Размер записанных данных в байтах. | |
DELETE | ||
numAddedFiles | Число добавленных файлов Не указывается при удалении секций таблицы. | |
numRemovedFiles | Число удаленных файлов. | |
numDeletedRows | Число удаленных строк. Не указывается при удалении секций таблицы. | |
numCopiedRows | Число строк, скопированных в процессе удаления файлов. | |
executionTimeMs | Время, затраченное на выполнение всей операции. | |
scanTimeMs | Время, затраченное на сканирование файлов на соответствие. | |
rewriteTimeMs | Время, затраченное на перезапись сопоставленных файлов. | |
TRUNCATE | ||
numRemovedFiles | Число удаленных файлов. | |
executionTimeMs | Время, затраченное на выполнение всей операции. | |
MERGE | ||
numSourceRows | Число строк в исходном DataFrame. | |
numTargetRowsInserted | Число строк, вставленных в целевую таблицу. | |
numTargetRowsUpdated | Число строк, обновленных в целевой таблице. | |
numTargetRowsDeleted | Число строк, удаленных в целевой таблице. | |
numTargetRowsCopied | Число скопированных целевых строк. | |
numOutputRows | Число выведенных строк. | |
numTargetFilesAdded | Число файлов, добавленных в приемник (целевой объект). | |
numTargetFilesRemoved | Число файлов, удаленных из приемника (целевого объекта). | |
executionTimeMs | Время, затраченное на выполнение всей операции. | |
scanTimeMs | Время, затраченное на сканирование файлов на соответствие. | |
rewriteTimeMs | Время, затраченное на перезапись сопоставленных файлов. | |
UPDATE | ||
numAddedFiles | Число добавленных файлов | |
numRemovedFiles | Число удаленных файлов. | |
numUpdatedRows | Число обновленных строк. | |
numCopiedRows | Число строк, только что скопированных в процессе обновления файлов. | |
executionTimeMs | Время, затраченное на выполнение всей операции. | |
scanTimeMs | Время, затраченное на сканирование файлов на соответствие. | |
rewriteTimeMs | Время, затраченное на перезапись сопоставленных файлов. | |
FSCK | numRemovedFiles | Число удаленных файлов. |
CONVERT | numConvertedFiles | Число преобразованных файлов Parquet. |
OPTIMIZE | ||
numAddedFiles | Число добавленных файлов | |
numRemovedFiles | Число оптимизированных файлов. | |
numAddedBytes | Число байтов, добавленных после оптимизации таблицы. | |
numRemovedBytes | Число удаленніх байтов. | |
minFileSize | Размер наименьшего файла после оптимизации таблицы. | |
p25FileSize | Размер файла 25-го процентиля после оптимизации таблицы. | |
p50FileSize | Медианный размер файла после оптимизации таблицы. | |
p75FileSize | Размер файла 75-го процентиля после оптимизации таблицы. | |
maxFileSize | Размер наибольшего файла после оптимизации таблицы. | |
CLONE; | ||
sourceTableSize | Размер в байтах исходной таблицы в клонированной версии. | |
sourceNumOfFiles | Число файлов в исходной таблице в клонированной версии. | |
numRemovedFiles | Число файлов, удаленных из целевой таблицы, если была заменена предыдущая таблица Delta. | |
removedFilesSize | Общий размер в байтах файлов, удаленных из целевой таблицы, если была заменена предыдущая таблица Delta. | |
numCopiedFiles | Число файлов, которые были скопированы в новое расположение. 0 для поверхностных клонов. | |
copiedFilesSize | Общий размер файлов в байтах, которые были скопированы в новое расположение. 0 для поверхностных клонов. | |
RESTORE | ||
tableSizeAfterRestore | Размер таблицы в байтах после восстановления. | |
numOfFilesAfterRestore | Число файлов в таблице после восстановления. | |
numRemovedFiles | Число файлов, удаленных операцией восстановления. | |
numRestoredFiles | Число файлов, добавленных в результате восстановления. | |
removedFilesSize | Размер в байтах файлов, удаленных при восстановлении. | |
restoredFilesSize | Размер в байтах файлов, добавленных при восстановлении. | |
Команда VACUUM | ||
numDeletedFiles | Число удаленных файлов. | |
numVacuumedDirectories | Число каталогов, очищенных с помощью операции vacuum. | |
numFilesToDelete | Число удаляемых файлов. |
Что такое путешествие по времени Delta Lake?
Время Delta Lake поддерживает запросы предыдущих версий таблицы на основе метки времени или таблицы (как записано в журнале транзакций). Для таких приложений можно использовать поездку по времени:
- Воссоздание анализа, отчетов или выходных данных (например, выходных данных модели машинного обучения). Это может быть полезно для отладки или аудита, особенно в регулируемых отраслях.
- Написание сложных темпоральных запросов.
- Устранение ошибок в данных.
- Изоляция моментальных снимков для набора запросов для быстро меняющихся таблиц.
Внимание
Версии таблиц, доступные с помощью перемещения по времени, определяются сочетанием порогового значения хранения для файлов журнала транзакций и частоты и указанного хранения для VACUUM
операций. Если вы работаете VACUUM
ежедневно со значениями по умолчанию, для перемещения по времени доступно 7 дней данных.
Синтаксис разностных версий перехода по времени
Запросите таблицу Delta со временем, добавив предложение после спецификации имени таблицы.
timestamp_expression
может быть одним из следующих вариантов:'2018-10-18T22:15:12.013Z'
, то есть строкой, которая может приводиться к метке времени;cast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
, то есть строкой даты.current_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Любое другое выражение, которое является меткой времени или может быть приведено к ней
version
— это длинное значение, которое можно получить из выходных данныхDESCRIBE HISTORY table_spec
.
Ни timestamp_expression
, ни version
не может быть подзапросом.
Принимаются только строки метки даты или времени. Например, "2019-01-01"
и "2019-01-01T00:00:00.000Z"
. См. следующий код, например синтаксис:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
Можно также использовать @
синтаксис, чтобы указать метку времени или версию в составе имени таблицы. Метка времени должна быть указана в формате yyyyMMddHHmmssSSS
. Вы можете указать версию после @
, добавив к версии v
. См. следующий код, например синтаксис:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
Что такое контрольные точки журнала транзакций?
Delta Lake записывает версии таблиц в виде JSON-файлов в _delta_log
каталоге, которые хранятся вместе с данными таблицы. Чтобы оптимизировать запросы контрольных точек, Delta Lake объединяет версии таблиц в файлы контрольных точек Parquet, предотвращая необходимость считывания всех версий журнала таблиц JSON. Azure Databricks оптимизирует частоту контрольных точек для размера данных и рабочей нагрузки. Пользователям не нужно взаимодействовать с контрольными точками напрямую. Частота контрольных точек подлежит изменению без уведомления.
Настройка хранения данных для запросов на поездки по времени
Чтобы запросить предыдущую версию таблицы, необходимо сохранить журнал и файлы данных для этой версии.
Файлы данных удаляются при VACUUM
выполнении таблицы. Delta Lake автоматически удаляет файл журнала после контрольных версий таблицы.
Так как большинство разностных таблиц VACUUM
регулярно выполняются против них, запросы на определенный момент времени должны учитывать пороговое значение VACUUM
хранения, которое составляет 7 дней по умолчанию.
Чтобы увеличить порог хранения данных для таблиц Delta, необходимо настроить следующие свойства таблицы:
delta.logRetentionDuration = "interval <interval>"
: управляет продолжительностью хранения журнала для таблицы. Значение по умолчанию —interval 30 days
.delta.deletedFileRetentionDuration = "interval <interval>"
: определяет пороговое значениеVACUUM
, используемое для удаления файлов данных, на которые больше не ссылается текущая версия таблицы. Значение по умолчанию —interval 7 days
.
Свойства Delta можно указать во время создания таблицы или задать их с помощью инструкции ALTER TABLE
. См. справочник по свойствам таблицы Delta.
Примечание.
Эти свойства необходимо задать, чтобы журнал таблиц сохранялся в течение длительной длительности для таблиц с частыми VACUUM
операциями. Например, чтобы получить доступ к 30 дням исторических данных, задайте (delta.deletedFileRetentionDuration = "interval 30 days"
который соответствует параметру по умолчанию).delta.logRetentionDuration
Увеличение порогового значения хранения данных может привести к увеличению затрат на хранение, так как сохраняются дополнительные файлы данных.
Восстановление таблицы Delta до предыдущего состояния
Вы можете восстановить прежнее состояние таблицы Delta с помощью команды RESTORE
. Разностная Delta внутренне поддерживает предыдущие версии таблицы, что позволяет восстановить ее предыдущее состояние.
В качестве параметров команды RESTORE
поддерживаются версия, соответствующая предыдущему состоянию, или метка времени, когда было создано предыдущее состояние.
Внимание
- Вы можете восстановить уже восстановленную таблицу.
- Вы можете восстановить клонированную таблицу.
- Требуется разрешение
MODIFY
для восстанавливаемой таблицы. - Нельзя восстановить таблицу до более старой версии, в которой файлы данных были удалены вручную или вручную
vacuum
. Восстановление до этой версии по-прежнему возможно, если дляspark.sql.files.ignoreMissingFiles
задано значениеtrue
. - Формат метки времени для восстановления в более раннем состоянии —
yyyy-MM-dd HH:mm:ss
. Также поддерживается предоставление только строки даты (yyyy-MM-dd
).
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Сведения о синтаксисе см. в разделе RESTORE.
Внимание
Восстановление считается операцией изменения данных. Записи журнала Delta Lake, добавленные командой RESTORE
, содержат dataChange со значением true. Если у вас есть подчиненное приложение, например задание структурированного потока, которое обрабатывает обновления таблицы Delta Lake, записи журнала изменений данных, добавленные операцией восстановления, рассматриваются как новые обновления данных, и их обработка может привести к дублированию данных.
Например:
Версия таблицы | Операция | Обновления журналов изменений | Записи в обновлениях журнала изменений данных |
---|---|---|---|
0 | ВСТАВИТЬ | AddFile(/path/to/file-1, dataChange = true) | (name = Viktor, age = 29, (name = George, age = 55) |
1 | ВСТАВИТЬ | AddFile(/path/to/file-2, dataChange = true) | (name = George, age = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (Нет записей, так как оптимизация сжатия не изменяет данные в таблице.) |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
В предыдущем примере выполнение команды RESTORE
приводит к обновлениям, которые уже были видны при чтении таблицы Delta версий 0 и 1. Если запрос потоковой передачи выполнял чтение этой таблицы, эти файлы будут считаться новыми добавленными данными и будут обработаны снова.
Метрики восстановления
RESTORE
передает следующие метрики в виде DataFrame из одной строки после завершения операции:
table_size_after_restore
— размер таблицы после восстановления;num_of_files_after_restore
— число файлов в таблице после восстановления;num_removed_files
— число файлов, удаленных (логически удаленных) из таблицы;num_restored_files
— число файлов, восстановленных из-за отката;removed_files_size
— общий размер в байтах файлов, удаленных из таблицы;restored_files_size
— общий размер восстановленных файлов в байтах.
Примеры использования перехода по времени Delta Lake
Исправление случайных удалений в таблице для пользователя
111
:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Исправление случайного неправильного обновления таблицы:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Запросите количество новых клиентов, добавленных за последнюю неделю.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Разделы справки найти версию последней фиксации в сеансе Spark?
Чтобы получить номер версии последней фиксации, записанной текущим SparkSession
для всех потоков и всех таблиц, запросите конфигурацию SQL spark.databricks.delta.lastCommitVersionInSession
.
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Если фиксация объектом SparkSession
не выполнена, запрос ключа возвращает пустое значение.
Примечание.
Если одни и те же SparkSession
используются совместно в нескольких потоках, это аналогично совместному использованию переменной в нескольких потоках, и могут возникнуть состояния гонки при одновременном обновлении значения конфигурации.