API APPLY CHANGES: упрощение отслеживания изменений с помощью разностных динамических таблиц
Разностные динамические таблицы упрощают запись измененных данных (CDC) с APPLY CHANGES
помощью API и APPLY CHANGES FROM SNAPSHOT
API. Используемый интерфейс зависит от источника измененных данных:
- Используется
APPLY CHANGES
для обработки изменений из веб-канала измененных данных (CDF). - Используйте
APPLY CHANGES FROM SNAPSHOT
(общедоступную предварительную версию) для обработки изменений моментальных снимков базы данных.
Ранее инструкция MERGE INTO
часто использовалась для обработки записей CDC в Azure Databricks. MERGE INTO
Однако может привести к неправильным результатам из-за неупорядоченных записей или требуется сложная логика для повторного упорядочивания записей.
APPLY CHANGES
API поддерживается в интерфейсах SQL и Python Для разностных динамических таблиц. APPLY CHANGES FROM SNAPSHOT
API поддерживается в интерфейсе Python Delta Live Tables.
APPLY CHANGES FROM SNAPSHOT
Обе APPLY CHANGES
таблицы поддерживают обновление таблиц с помощью SCD типа 1 и типа 2:
- Используйте SCD типа 1 для обновления записей напрямую. Журнал не сохраняется для обновленных записей.
- Используйте SCD типа 2 для хранения журнала записей во всех обновлениях или обновлениях указанного набора столбцов.
Сведения о синтаксисе и других ссылках см. в следующей статье:
- Изменение записи данных из канала изменений с помощью Python в разностных динамических таблицах
- Изменение записи данных с помощью SQL в разностных динамических таблицах
Примечание.
В этой статье описано, как обновлять таблицы в конвейере Delta Live Tables на основе изменений в исходных данных. Дополнительные сведения о том, как записывать и запрашивать сведения об изменениях на уровне строк для таблиц Delta, см. в статье Использование веб-канала изменений данных Delta Lake в Azure Databricks.
Требования
Чтобы использовать API CDC, конвейер должен быть настроен для использования бессерверных конвейеров DLT или разностных динамических таблиц Pro
или Advanced
выпусков.
Как CDC реализуется с помощью APPLY CHANGES
API?
Автоматически обрабатывая записи вне последовательности, APPLY CHANGES
API в Delta Live Tables обеспечивает правильную обработку записей CDC и удаляет необходимость разработки сложной логики для обработки записей вне последовательности. Необходимо указать столбец в исходных данных, для которых выполняется последовательность записей, которые разностные динамические таблицы интерпретируются как монотонное увеличение правильного порядка исходных данных. Разностные динамические таблицы автоматически обрабатывают данные, поступающие из порядка. Для изменений типа SCD 2 разностные динамические таблицы распространяют соответствующие значения последовательности в целевые таблицы __START_AT
и __END_AT
столбцы. При каждом значении последовательности должно быть одно отдельное обновление, а значения последовательности NULL не поддерживаются.
Чтобы выполнить обработку APPLY CHANGES
CDC, сначала создайте таблицу потоковой передачи, а затем используйте APPLY CHANGES INTO
инструкцию в SQL или apply_changes()
функции в Python, чтобы указать источник, ключи и последовательности для канала изменений. Чтобы создать целевую потоковую таблицу, используйте CREATE OR REFRESH STREAMING TABLE
инструкцию в SQL или create_streaming_table()
функции в Python. См. примеры обработки SCD типа 1 и типа 2.
Дополнительные сведения о синтаксисе см. в справочнике по SQL Delta Live Tables или в справочнике по Python.
Как CDC реализуется с помощью APPLY CHANGES FROM SNAPSHOT
API?
Внимание
APPLY CHANGES FROM SNAPSHOT
API находится в общедоступной предварительной версии.
APPLY CHANGES FROM SNAPSHOT
— это декларативный API, который эффективно определяет изменения исходных данных путем сравнения ряда моментальных снимков в порядке, а затем выполняет обработку, необходимую для обработки записей CDC в моментальных снимках. APPLY CHANGES FROM SNAPSHOT
поддерживается только интерфейсом Python Delta Live Tables.
APPLY CHANGES FROM SNAPSHOT
поддерживает прием моментальных снимков из нескольких типов источников:
- Используйте периодическое прием моментальных снимков для приема моментальных снимков из существующей таблицы или представления.
APPLY CHANGES FROM SNAPSHOT
имеет простой, упрощенный интерфейс для поддержки периодически приема моментальных снимков из существующего объекта базы данных. Новый моментальный снимок выполняется при каждом обновлении конвейера, а время приема используется в качестве версии моментального снимка. При выполнении конвейера в непрерывном режиме несколько моментальных снимков получаются при каждом обновлении конвейера за период, определенный параметром интервала триггера для потока, содержащего обработку APPLY CHANGES FROM SNAPSHOT. - Используйте прием исторических моментальных снимков для обработки файлов, содержащих моментальные снимки базы данных, например моментальные снимки, созданные из базы данных Oracle или MySQL или хранилища данных.
Чтобы выполнить обработку CDC из любого исходного типа APPLY CHANGES FROM SNAPSHOT
, сначала создайте таблицу потоковой передачи, а затем используйте apply_changes_from_snapshot()
функцию в Python, чтобы указать моментальный снимок, ключи и другие аргументы, необходимые для реализации обработки. Ознакомьтесь с примерами приема периодических моментальных снимков и историческими примерами приема моментальных снимков.
Моментальные снимки, передаваемые API, должны находиться в порядке возрастания по версии. Если разностные динамические таблицы обнаруживают моментальный снимок вне порядка, возникает ошибка.
Дополнительные сведения о синтаксисе см. в справочнике по Python для разностных динамических таблиц.
Ограничения
Столбец, используемый для последовательности, должен быть сортируемым типом данных.
Пример: обработка SCD типа 1 и SCD типа 2 с исходными данными CDF
В следующих разделах приведены примеры запросов SCD типа 1 и типа 2 delta Live Tables, которые обновляют целевые таблицы на основе исходных событий из канала измененных данных:
- Создает новые записи пользователей.
- Удаляет запись пользователя.
- Обновляет записи пользователей. В примере SCD типа 1 последние
UPDATE
операции приходят поздно и удаляются из целевой таблицы, демонстрируя обработку событий вне порядка.
В следующих примерах предполагается знакомство с настройкой и обновлением конвейеров Delta Live Tables. См . руководство. Запуск первого конвейера live tables Delta Live Tables.
Для выполнения этих примеров необходимо начать с создания примера набора данных. См. статью "Создание тестовых данных".
Ниже приведены входные записи для этих примеров.
userId | name | city | Операция | sequenceNum |
---|---|---|---|---|
124 | Raul | Оахака | ВСТАВИТЬ | 1 |
123 | Isabel | Monterrey | ВСТАВИТЬ | 1 |
125 | Mercedes | Тихуана | ВСТАВИТЬ | 2 |
126 | Lily | Cancun | ВСТАВИТЬ | 2 |
123 | null | null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Чиуауа | UPDATE | 5 |
Если вы раскомментируете окончательную строку в примерах данных, вставьте следующую запись, указывающую, где должны быть усечены записи:
userId | name | city | Операция | sequenceNum |
---|---|---|---|---|
null | null | null | TRUNCATE | 3 |
Примечание.
Все приведенные ниже примеры включают параметры для указания обоих DELETE
и TRUNCATE
операций, но каждый из них является необязательным.
Обработка обновлений SCD типа 1
В следующем примере демонстрируется обработка обновлений SCD типа 1:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
После выполнения примера с SCD типа 1 целевая таблица будет содержать следующие записи:
userId | name | city |
---|---|---|
124 | Raul | Оахака |
125 | Mercedes | Guadalajara |
126 | Lily | Cancun |
После запуска примера SCD типа 1 с дополнительной записью TRUNCATE
записи 124
и 126
усекаются из-за операции TRUNCATE
в sequenceNum=3
, а целевая таблица содержит следующую запись:
userId | name | city |
---|---|---|
125 | Mercedes | Guadalajara |
Обработка обновлений SCD типа 2
В следующем примере демонстрируется обработка обновлений SCD типа 2:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
После выполнения примера с SCD типа 2 целевая таблица будет содержать следующие записи:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Чиуауа | 5 | 6 |
124 | Raul | Оахака | 1 | null |
125 | Mercedes | Тихуана | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | null |
126 | Lily | Cancun | 2 | null |
Запрос типа 2 SCD также может указать подмножество выходных столбцов для отслеживания журнала в целевой таблице. Изменения других столбцов обновляются вместо создания новых записей журнала. В следующем примере показано, как исключить столбец из отслеживания city
:
В следующем примере показано использование журнала отслеживания с типом 2 SCD:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
После выполнения этого примера без дополнительной TRUNCATE
записи целевая таблица содержит следующие записи:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Чиуауа | 1 | 6 |
124 | Raul | Оахака | 1 | null |
125 | Mercedes | Guadalajara | 2 | null |
126 | Lily | Cancun | 2 | null |
Создание тестовых данных
Приведенный ниже код содержится для создания примера набора данных для использования в примерах запросов, представленных в этом руководстве. Если у вас есть необходимые учетные данные для создания новой схемы и создания новой таблицы, эти инструкции можно выполнить с помощью записной книжки или Databricks SQL. Следующий код не предназначен для запуска в составе конвейера Delta Live Tables:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Пример. Периодическое обработка моментальных снимков
В следующем примере показана обработка SCD типа 2, которая обрабатывает моментальные снимки таблицы, хранимой в mycatalog.myschema.mytable
ней. Результаты обработки записываются в таблицу с именем target
.
mycatalog.myschema.mytable
записи в метке времени 2024-01-01 00:00:00
Ключ | Значение |
---|---|
1 | А1 |
2 | А2 |
mycatalog.myschema.mytable
записи в метке времени 2024-01-01 12:00:00
Ключ | Значение |
---|---|
2 | Б2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.apply_changes_from_snapshot(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
После обработки моментальных снимков целевая таблица содержит следующие записи:
Ключ | Значение | __START_AT | __END_AT |
---|---|---|---|
1 | А1 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | А2 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | Б2 | 2024-01-01 12:00:00 | null |
3 | a3 | 2024-01-01 12:00:00 | null |
Пример: обработка исторических моментальных снимков
В следующем примере показана обработка SCD типа 2, которая обновляет целевую таблицу на основе исходных событий из двух моментальных снимков, хранящихся в облачной системе хранения:
Моментальный снимок в timestamp
, хранящийся в /<PATH>/filename1.csv
Ключ | TrackingColumn | NonTrackingColumn |
---|---|---|
1 | А1 | b1 |
2 | А2 | Б2 |
4 | a4 | b4 |
Моментальный снимок в timestamp + 5
, хранящийся в /<PATH>/filename2.csv
Ключ | TrackingColumn | NonTrackingColumn |
---|---|---|
2 | a2_new | Б2 |
3 | a3 | b3 |
4 | a4 | b4_new |
В следующем примере кода демонстрируется обработка обновлений SCD типа 2 с этими моментальными снимками:
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.apply_changes_from_snapshot(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
После обработки моментальных снимков целевая таблица содержит следующие записи:
Ключ | TrackingColumn | NonTrackingColumn | __START_AT | __END_AT |
---|---|---|---|---|
1 | А1 | b1 | 1 | 2 |
2 | А2 | Б2 | 1 | 2 |
2 | a2_new | Б2 | 2 | null |
3 | a3 | b3 | 2 | null |
4 | a4 | b4_new | 1 | null |
Добавление, изменение или удаление данных в целевой потоковой таблице
Если конвейер публикует таблицы в каталоге Unity, можно использовать инструкции языка обработки данных (DML), включая инструкции insert, update, delete и merge, для изменения целевых таблиц потоковой передачи, созданных APPLY CHANGES INTO
операторами.
Примечание.
- Инструкции DML, изменяющие схему таблицы потоковой передачи, не поддерживаются. Убедитесь, что операторы DML не пытаются развивать схему таблицы.
- Инструкции DML, обновляющие потоковую таблицу, могут выполняться только в общем кластере каталога Unity или хранилище SQL с помощью Databricks Runtime 13.3 LTS и более поздних версий.
- Так как для потоковой передачи требуются источники данных только для добавления, если обработка требует потоковой передачи из исходной таблицы потоковой передачи с изменениями (например, операторами DML), задайте флаг skipChangeCommits при чтении исходной таблицы потоковой передачи. При
skipChangeCommits
установке транзакции, которые удаляют или изменяют записи в исходной таблице, игнорируются. Если для обработки не требуется потоковая таблица, можно использовать материализованное представление (которое не имеет ограничения только для добавления) в качестве целевой таблицы.
Так как разностные динамические таблицы используют указанный SEQUENCE BY
столбец и распространяют соответствующие значения последовательности в __START_AT
__END_AT
целевые таблицы (для SCD типа 2), необходимо убедиться, что инструкции DML используют допустимые значения для этих столбцов для поддержания правильного порядка записей. Узнайте, как CDC реализован с помощью API APPLY CHANGES?.
Дополнительные сведения об использовании инструкций DML с таблицами потоковой передачи см. в разделе "Добавление, изменение или удаление данных" в таблице потоковой передачи.
В следующем примере вставляется активная запись с начальной последовательностью 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
Чтение веб-канала изменений из целевой APPLY CHANGES
таблицы
В Databricks Runtime 15.2 и более поздних версиях можно считывать веб-канал данных изменений из таблицы потоковой передачи, которая является целью APPLY CHANGES
или APPLY CHANGES FROM SNAPSHOT
запросами таким же образом, как и чтение веб-канала изменений из других таблиц Delta. Для чтения веб-канала изменений из целевой таблицы потоковой передачи необходимо следующее:
- Целевая таблица потоковой передачи должна быть опубликована в каталоге Unity. См. раздел "Использование каталога Unity" с конвейерами Delta Live Tables.
- Чтобы прочитать веб-канал изменений из целевой потоковой таблицы, необходимо использовать Databricks Runtime 15.2 или более поздней версии. Чтобы прочитать веб-канал изменений в другом конвейере разностных динамических таблиц, конвейер должен быть настроен для использования Databricks Runtime 15.2 или более поздней версии.
Вы считываете веб-канал изменений из целевой таблицы потоковой передачи, созданной в конвейере Разностных динамических таблиц, так же, как чтение веб-канала изменений из других таблиц Delta. Дополнительные сведения об использовании функции веб-канала изменений Delta, включая примеры в Python и SQL, см. в статье Использование веб-канала изменений Delta Lake в Azure Databricks.
Примечание.
Запись канала изменений включает метаданные , определяющие тип события изменения. При обновлении записи в таблице метаданные связанных записей изменений обычно включают _change_type
значения, заданные update_preimage
и update_postimage
события.
Однако значения отличаются, _change_type
если обновления вносятся в целевую потоковую таблицу, включающую изменение значений первичного ключа. Если изменения включают обновления первичных ключей, _change_type
поля метаданных задаются insert
и delete
события. Изменения первичных ключей могут возникать при внесении обновлений вручную в одно из ключевых полей с UPDATE
инструкцией или MERGE
для таблиц SCD типа 2, когда __start_at
поле изменяется на более раннее начальное значение последовательности.
Запрос APPLY CHANGES
определяет значения первичного ключа, которые отличаются для обработки SCD типа 1 и SCD типа 2:
- Для обработки SCD типа 1 и интерфейса Python Delta Live Tables первичный ключ является значением
keys
параметра вapply_changes()
функции. Для интерфейса SQL Delta Live Tables первичный ключ — это столбцы, определенные предложениемKEYS
в инструкцииAPPLY CHANGES INTO
. - Для SCD типа 2 первичный ключ является
keys
параметром илиKEYS
предложением, а также возвращаемым значением операцииcoalesce(__START_AT, __END_AT)
, где__START_AT
и__END_AT
являются соответствующими столбцами из целевой потоковой таблицы.
Получение данных о записях, обработанных запросом CDC Delta Live Tables
Примечание.
Следующие метрики фиксируются только APPLY CHANGES
запросами, а не APPLY CHANGES FROM SNAPSHOT
запросами.
Следующие метрики фиксируются запросами APPLY CHANGES
:
num_upserted_rows
: количество выходных строк, которые добавляются в набор данных во время обновления.num_deleted_rows
: количество существующих выходных строк, удаленных из набора данных во время обновления.
Метрика, выходные num_output_rows
данные для потоков, отличных от CDC, не фиксируются для apply changes
запросов.
Какие объекты данных используются для обработки CDC разностных динамических таблиц?
Примечание. Следующие структуры данных применяются только к APPLY CHANGES
обработке, а не APPLY CHANGES FROM SNAPSHOT
к обработке.
При объявлении целевой таблицы в хранилище метаданных Hive создаются две структуры данных:
- Представление с именем, назначенным целевой таблице.
- Внутренняя резервная таблица, используемая Delta Live Table для управления обработкой CDC. Эта таблица называется путем подготовки
__apply_changes_storage_
к имени целевой таблицы.
Например, если объявить целевую таблицу с именем dlt_cdc_target
, вы увидите представление с именем и таблицей с именем dlt_cdc_target
__apply_changes_storage_dlt_cdc_target
в хранилище метаданных. Создание представления позволяет разностным динамическим таблицам отфильтровать дополнительные сведения (например, на могилах и версиях), необходимые для обработки данных вне порядка. Чтобы просмотреть обработанные данные, выполните запрос к целевому представлению. Так как схема таблицы может измениться для поддержки __apply_changes_storage_
будущих функций или улучшений, не следует запрашивать таблицу для использования в рабочей среде. При добавлении данных вручную в таблицу предполагается, что записи будут поступать до других изменений, так как столбцы версии отсутствуют.
Если конвейер публикуется в каталоге Unity, внутренние резервные таблицы недоступны для пользователей.