Функция с табличным значением table_changes
Область применения: Databricks SQL Databricks Runtime
Возвращает журнал изменений в таблицу Delta Lake с включенным изменением веб-канала данных.
Чтобы вызвать эту функцию, необходимо иметь по крайней мере одну из следующих функций:
- Привилегия
SELECT
для указанной таблицы. - Быть владельцем таблицы.
- Обладать правами администратора.
Синтаксис
table_changes ( table_str, start [, end ] )
Аргументы
table_str
: литерал STRING, представляющий необязательно полное имя таблицы.start
: литерал BIGINT или TIMESTAMP, представляющий первую версию или метку времени возврата изменений.end
: необязательный литерал BIGINT или TIMESTAMP, представляющий последнюю версию или метку времени возврата изменений. Если не указано, все изменения, внесенные вstart
(вплоть до текущего изменения), возвращаются.
Возвраты
Таблица, включая все столбцы таблицы, определенной в table_str
, а также следующие столбцы:
_change_type STRING NOT NULL
Указывает изменение:
delete
,insert
,update_preimage
илиupdate_postimage
_commit_version BIGINT NOT NULL
Указывает версию фиксации таблицы, связанной с изменением.
_commit_timestamp TIMESTAMP NOT NULL
Указывает метку времени фиксации, связанной с изменением.
Если table_str
не представляет полное имя таблицы, то имя квалифицируется со значением current_schema
.
Если имя таблицы содержит пробелы или точки, используйте обратные кавычки в строке, чтобы взять эту часть имени в кавычки.
Примеры
-- Create a Delta table with Change Data Feed;
> CREATE TABLE myschema.t(c1 INT, c2 STRING) TBLPROPERTIES(delta.enableChangeDataFeed=true);
-- Modify the table
> INSERT INTO myschema.t VALUES (1, 'Hello'), (2, 'World');
> INSERT INTO myschema.t VALUES (3, '!');
> UPDATE myschema.t SET c2 = upper(c2) WHERE c1 < 3;
> DELETE FROM myschema.t WHERE c1 = 3;
-- Show the history of table change events
> DESCRIBE HISTORY myschema.t;
version timestamp userId userName operation operationParameters ...
4 2022-09-01T18:32:35.000+0000 6167625779053302 alf@melmak.et DELETE {"predicate":"[\"(spark_catalog.myschema.t.c1 = 3)\"]"}
3 2022-09-01T18:32:32.000+0000 6167625779053302 alf@melmak.et UPDATE {"predicate":"(c1#3195878 < 3)"}
2 2022-09-01T18:32:28.000+0000 6167625779053302 alf@melmak.et WRITE {"mode":"Append","partitionBy":"[]"}
1 2022-09-01T18:32:26.000+0000 6167625779053302 alf@melmak.et WRITE {"mode":"Append","partitionBy":"[]"}
0 2022-09-01T18:32:23.000+0000 6167625779053302 alf@melmak.et CREATE TABLE {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{\"delta.enableChangeDataFeed\":\"true\"}"}
-- Show the change table feed using a the commit timestamp retrieved from the history.
> SELECT * FROM table_changes('`myschema`.`t`', 2);
c1 c2 _change_type _commit_version _commit_timestamp
3 ! insert 2 2022-09-01T18:32:28.000+0000
2 WORLD update_postimage 3 2022-09-01T18:32:32.000+0000
2 World update_preimage 3 2022-09-01T18:32:32.000+0000
1 Hello update_preimage 3 2022-09-01T18:32:32.000+0000
1 HELLO update_postimage 3 2022-09-01T18:32:32.000+0000
3 ! delete 4 2022-09-01T18:32:35.000+0000
-- Show the ame change table feed using a point in time.
> SELECT * FROM table_changes('`myschema`.`t`', '2022-09-01T18:32:27.000+0000') ORDER BY _commit_version;
c1 c2 _change_type _commit_version _commit_timestamp
3 ! insert 2 2022-09-01T18:32:28.000+0000
2 WORLD update_postimage 3 2022-09-01T18:32:32.000+0000
2 World update_preimage 3 2022-09-01T18:32:32.000+0000
1 Hello update_preimage 3 2022-09-01T18:32:32.000+0000
1 HELLO update_postimage 3 2022-09-01T18:32:32.000+0000
3 ! delete 4 2022-09-01T18:32:35.000+0000