ПРИМЕНЕНИЕ API ИЗМЕНЕНИЙ: упрощение отслеживания изменений в разностных динамических таблицах
Разностные динамические таблицы упрощают запись измененных данных (CDC) с APPLY CHANGES
помощью API. Ранее инструкция MERGE INTO
часто использовалась для обработки записей CDC в Azure Databricks. MERGE INTO
Однако может привести к неправильным результатам из-за неупорядоченных записей или требовать сложную логику для повторного порядка записей.
Автоматически обрабатывая записи вне последовательности, APPLY CHANGES
API в Delta Live Tables обеспечивает правильную обработку записей CDC и удаляет необходимость разработки сложной логики для обработки записей вне последовательности.
API APPLY CHANGES
поддерживается в интерфейсах SQL и Python Delta Live Tables, включая поддержку обновления таблиц с scD типа 1 и типа 2:
- Используйте SCD типа 1 для обновления записей напрямую. Для обновляемых записей журнал не сохраняется.
- Используйте SCD типа 2 для хранения журнала записей во всех обновлениях или обновлениях указанного набора столбцов.
Сведения о синтаксисе и других ссылках см. в следующей статье:
- Изменение записи данных с помощью Python в разностных динамических таблицах
- Изменение записи данных с помощью SQL в разностных динамических таблицах
- Управление контрольным камнем для запросов типа 1 SCD
Примечание.
В этой статье описано, как обновлять таблицы в конвейере Delta Live Tables на основе изменений в исходных данных. Дополнительные сведения о том, как записывать и запрашивать сведения об изменениях на уровне строк для таблиц Delta, см. в статье Использование веб-канала изменений данных Delta Lake в Azure Databricks.
Как CDC реализуется с помощью разностных динамических таблиц?
Необходимо указать столбец в исходных данных, для которых выполняется последовательность записей, которые разностные динамические таблицы интерпретируются как монотонное увеличение правильного порядка исходных данных. Разностные динамические таблицы автоматически обрабатывают данные, поступающие из порядка. Для изменений типа SCD 2 разностные динамические таблицы распространяют соответствующие значения последовательности в __START_AT
__END_AT
столбцы целевой таблицы. При каждом значении последовательности должно быть одно отдельное обновление, а значения последовательности NULL не поддерживаются.
Чтобы выполнить обработку CDC с помощью Delta Live Table, сначала создайте потоковую таблицу, а затем используйте APPLY CHANGES INTO
инструкцию, чтобы указать источник, ключи и последовательности для канала изменений. Чтобы создать целевую потоковую таблицу, используйте CREATE OR REFRESH STREAMING TABLE
инструкцию в SQL или create_streaming_table()
функции в Python. Чтобы создать инструкцию, определяющую обработку CDC, используйте APPLY CHANGES
инструкцию в SQL или apply_changes()
функции в Python. Сведения о синтаксисе см. в разделе "Изменение записи данных с помощью SQL в разностных динамических таблицах " или "Изменение данных" с помощью Python в разностных динамических таблицах.
Какие объекты данных используются для обработки CDC разностных динамических таблиц?
При объявлении целевой таблицы в хранилище метаданных Hive создаются две структуры данных:
- Представление с именем, назначенным целевой таблице.
- Внутренняя резервная таблица, используемая Delta Live Table для управления обработкой CDC. Эта таблица называется путем подготовки
__apply_changes_storage_
к имени целевой таблицы.
Например, если объявить целевую таблицу с именем dlt_cdc_target
, вы увидите представление с именем и таблицей с именем dlt_cdc_target
__apply_changes_storage_dlt_cdc_target
в хранилище метаданных. Создание представления позволяет разностным динамическим таблицам отфильтровать дополнительные сведения (например, на могилах и версиях), необходимые для обработки данных вне порядка. Чтобы просмотреть обработанные данные, выполните запрос к целевому представлению. Так как схема таблицы может измениться для поддержки __apply_changes_storage_
будущих функций или улучшений, не следует запрашивать таблицу для использования в рабочей среде. При добавлении данных вручную в таблицу предполагается, что записи будут поступать до других изменений, так как столбцы версии отсутствуют.
Если конвейер публикуется в каталоге Unity, внутренние резервные таблицы недоступны для пользователей.
Получение данных о записях, обработанных запросом CDC Delta Live Tables
Следующие метрики фиксируются запросами apply changes
:
num_upserted_rows
: количество выходных строк, которые добавляются в набор данных во время обновления.num_deleted_rows
: количество существующих выходных строк, удаленных из набора данных во время обновления.
num_output_rows
Метрика, которая выводится для потоков, отличных от CDC, не фиксируется для apply changes
запросов.
Ограничения
Целевой APPLY CHANGES INTO
объект запроса или apply_changes
функции нельзя использовать в качестве источника для потоковой таблицы. Таблица, которая считывается из целевого APPLY CHANGES INTO
объекта запроса или apply_changes
функции, должна быть материализованным представлением.
SCD типа 1 и 2 в Azure Databricks
В следующих разделах приведены примеры, демонстрирующие scD типа 1 и типа 2 запросов Delta Live Tables, которые обновляют целевые таблицы на основе исходных событий, которые:
- создают новые записи пользователей;
- удаляют запись пользователя;
- обновляют записи пользователя. В примере SCD типа 1 последние
UPDATE
операции приходят поздно и удаляются из целевой таблицы, демонстрируя обработку событий вне порядка.
В следующих примерах предполагается знакомство с настройкой и обновлением конвейеров Delta Live Tables. См . руководство. Запуск первого конвейера live tables Delta Live Tables.
Для выполнения этих примеров необходимо начать с создания примера набора данных. См. статью "Создание тестовых данных".
Ниже приведены входные записи для этих примеров.
userId | name | city | Операция | sequenceNum |
---|---|---|---|---|
124 | Raul | Оахака | ВСТАВИТЬ | 1 |
123 | Isabel | Monterrey | ВСТАВИТЬ | 1 |
125 | Mercedes | Тихуана | ВСТАВИТЬ | 2 |
126 | Lily | Cancun | ВСТАВИТЬ | 2 |
123 | null | null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Чихуахуа | UPDATE | 5 |
Если вы раскомментируете окончательную строку в примерах данных, вставьте следующую запись, указывающую, где должны быть усечены записи:
userId | name | city | Операция | sequenceNum |
---|---|---|---|---|
null | null | null | TRUNCATE | 3 |
Примечание.
Все приведенные ниже примеры включают параметры для указания обоих DELETE
операций и TRUNCATE
операций, но все из них являются необязательными.
Обработка обновлений SCD типа 1
В следующем примере кода демонстрируется обработка обновлений SCD типа 1:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
После выполнения примера с SCD типа 1 целевая таблица будет содержать следующие записи:
userId | name | city |
---|---|---|
124 | Raul | Оахака |
125 | Mercedes | Guadalajara |
126 | Lily | Cancun |
После запуска примера SCD типа 1 с дополнительной записью TRUNCATE
записи 124
и 126
усекаются из-за операции TRUNCATE
в sequenceNum=3
, а целевая таблица содержит следующую запись:
userId | name | city |
---|---|---|
125 | Mercedes | Guadalajara |
Обработка обновлений SCD типа 2
В следующем примере кода демонстрируется обработка обновлений SCD типа 2:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
После выполнения примера с SCD типа 2 целевая таблица будет содержать следующие записи:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Чихуахуа | 5 | 6 |
124 | Raul | Оахака | 1 | null |
125 | Mercedes | Тихуана | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | null |
126 | Lily | Cancun | 2 | null |
Запрос типа 2 SCD также может указать подмножество выходных столбцов для отслеживания журнала в целевой таблице. Изменения других столбцов обновляются вместо создания новых записей журнала. В следующем примере показано, как исключить столбец из отслеживания city
:
В следующем примере показано использование журнала отслеживания с типом 2 SCD:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
После выполнения этого примера без дополнительной TRUNCATE
записи целевая таблица содержит следующие записи:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Чихуахуа | 1 | 6 |
124 | Raul | Оахака | 1 | null |
125 | Mercedes | Guadalajara | 2 | null |
126 | Lily | Cancun | 2 | null |
Создание тестовых данных
Приведенный ниже код содержится для создания примера набора данных для использования в примерах запросов, представленных в этом руководстве. Если у вас есть необходимые учетные данные для создания новой схемы и создания новой таблицы, вы можете выполнить эти инструкции с помощью записной книжки или Databricks SQL. Следующий код не предназначен для запуска в составе конвейера Delta Live Tables:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Добавление, изменение или удаление данных в целевой потоковой таблице
Если конвейер публикует таблицы в каталоге Unity, можно использовать инструкции языка обработки данных (DML), включая инструкции insert, update, delete и merge, для изменения целевых таблиц потоковой передачи, созданных APPLY CHANGES INTO
операторами.
Примечание.
- Инструкции DML, изменяющие схему таблицы потоковой передачи, не поддерживаются. Убедитесь, что операторы DML не пытаются развивать схему таблицы.
- Инструкции DML, обновляющие потоковую таблицу, могут выполняться только в общем кластере каталога Unity или хранилище SQL с помощью Databricks Runtime 13.3 LTS и более поздних версий.
- Так как для потоковой передачи требуются источники данных только для добавления, если обработка требует потоковой передачи из исходной таблицы потоковой передачи с изменениями (например, операторами DML), задайте флаг skipChangeCommits при чтении исходной таблицы потоковой передачи. При
skipChangeCommits
установке транзакции, которые удаляют или изменяют записи в исходной таблице, игнорируются. Если для обработки не требуется потоковая таблица, можно использовать материализованное представление (которое не имеет ограничения только для добавления) в качестве целевой таблицы.
Так как разностные динамические таблицы используют указанный SEQUENCE BY
столбец и распространяют соответствующие значения последовательности в __START_AT
__END_AT
целевые таблицы (для SCD типа 2), необходимо убедиться, что инструкции DML используют допустимые значения для этих столбцов для поддержания правильного порядка записей. Узнайте, как CDC реализуется с помощью Delta Live Tables?.
Дополнительные сведения об использовании инструкций DML с таблицами потоковой передачи см. в разделе "Добавление, изменение или удаление данных" в таблице потоковой передачи.
В следующем примере вставляется активная запись с начальной последовательностью 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);