Функция create_auto_cdc_flow()
создает поток, использующий функционал захвата изменений данных (CDC) декларативных конвейеров Lakeflow для обработки исходных данных из потока данных изменений (CDF).
Замечание
Эта функция заменяет предыдущую функцию apply_changes()
. Две функции имеют одинаковую сигнатуру. Databricks рекомендует перейти на использование нового имени.
Это важно
Для применения изменений необходимо указать целевую потоковую таблицу. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы create_auto_cdc_flow()
необходимо включить столбцы __START_AT
и __END_AT
с тем же типом данных, что и поля sequence_by
.
Чтобы создать требуемую целевую таблицу, можно использовать функцию create_streaming_table() в интерфейсе Python для декларативного конвейера Lakeflow.
Синтаксис
import dlt
dlt.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Для обработки create_auto_cdc_flow
стандартное поведение для событий INSERT
и UPDATE
заключается в обновлении или вставке событий CDC из источника: обновляются все строки в целевой таблице, соответствующие указанным ключам, или вставляется новая строка, если соответствующая запись не существует в целевой таблице.
DELETE
Обработку событий можно указать параметром apply_as_deletes
.
Дополнительные сведения об обработке CDC с потоком изменений см. в AUTO CDC API: упрощение отслеживания изменений с помощью декларативных конвейеров Lakeflow. Пример использования функции create_auto_cdc_flow()
см. в разделе Пример: Обработка SCD типа 1 и SCD типа 2 с исходными данными CDF.
Параметры
Параметр |
Тип |
Описание |
target |
str |
Обязательное. Имя обновляемой таблицы. Вы можете использовать функцию create_streaming_table(), чтобы создать целевую таблицу перед выполнением функции create_auto_cdc_flow() . |
source |
str |
Обязательное. Источник данных, содержащий записи CDC. |
keys |
list |
Обязательное. Столбец или сочетание столбцов, однозначно определяющих строку в исходных данных. Это позволяет определить, какие события CDC применяются к определенным записям в целевой таблице. Можно указать любой из следующих вариантов:
- Список строк:
["userId", "orderId"] - Список функций Spark SQL
col() : [col("userId"), col("orderId")] Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId) , но нельзя использовать col(source.userId) .
|
sequence_by |
str , col() или struct() |
Обязательное. Имена столбцов, указывающие логический порядок событий CDC в исходных данных. Декларативные конвейеры Lakeflow используют эту последовательность для обработки событий изменений, поступающих не в порядке. Указанный столбец должен быть сортируемым типом данных. Можно указать любой из следующих вариантов:
- Строка:
"sequenceNum" - Функция Spark SQL
col() : col("sequenceNum") . Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId) , но нельзя использовать col(source.userId) . - Объединение
struct() нескольких столбцов для разрешения одинаковых значений: struct("timestamp_col", "id_col") вначале будет упорядочивать по первому полю структуры, затем по второму полю, если найдутся одинаковые значения, и так далее.
|
ignore_null_updates |
bool |
Разрешить принимать обновления, содержащие подмножество целевых столбцов. Если событие CDC соответствует существующей строке и ignore_null_updates равно True , столбцы с null сохраняют свои значения в целевом объекте. Это также относится к вложенным столбцам со значением null . Если ignore_null_updates равно False , существующие значения перезаписываются со значениями null . Значение по умолчанию — False . |
apply_as_deletes |
str или expr() |
Указывает, когда событие CDC следует рассматривать как DELETE , а не как upsert. Можно указать любой из следующих вариантов:
- Строка:
"Operation = 'DELETE'" - Функция Spark SQL
expr() : expr("Operation = 'DELETE'")
Чтобы обрабатывать внеочередные данные, удаленная строка временно сохраняется в виде гробового камня в базовой таблице Delta, и создается представление в хранилище метаданных, фильтрующее эти гробовые камни. Интервал хранения можно настроить с помощью pipelines.cdc.tombstoneGCThresholdInSeconds свойства таблицы. |
apply_as_truncates |
str или expr() |
Указывает, когда событие CDC следует рассматривать как полную таблицу TRUNCATE . Можно указать любой из следующих вариантов:
- Строка:
"Operation = 'TRUNCATE'" - Функция Spark SQL
expr() : expr("Operation = 'TRUNCATE'")
Так как эта оговорка активирует полное усечение целевой таблицы, ее следует использовать только для конкретных вариантов использования, требующих такой функциональности. Параметр apply_as_truncates поддерживается только для SCD типа 1. Тип SCD 2 не поддерживает операции усечения данных. |
column_list или except_column_list |
list |
Подмножество столбцов для включения в целевую таблицу. Используйте column_list для указания полного списка включаемых столбцов. Используйте except_column_list для указания исключаемых столбцов. Вы можете объявить любое из этих значений в виде списка строк или в виде функций col() Spark SQL:
column_list = ["userId", "name", "city"] column_list = [col("userId"), col("name"), col("city")] except_column_list = ["operation", "sequenceNum"] except_column_list = [col("operation"), col("sequenceNum")
Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId) , но нельзя использовать col(source.userId) . По умолчанию включаются все столбцы в целевой таблице, если в функцию не передается аргумент column_list или except_column_list . |
stored_as_scd_type |
str или int |
Следует ли хранить записи как SCD типа 1 или SCD типа 2. Укажите значение 1 для SCD типа 1 или 2 для SCD типа 2. Значение по умолчанию — SCD тип 1. |
track_history_column_list или track_history_except_column_list |
list |
Подмножество столбцов выходных данных для исторического отслеживания в целевой таблице. Используйте track_history_column_list , чтобы указать полный список столбцов для отслеживания. Используется track_history_except_column_list для указания столбцов, которые следует исключить из отслеживания. Вы можете объявить любое из этих значений в виде списка строк или в виде функций col() Spark SQL:
track_history_column_list = ["userId", "name", "city"] track_history_column_list = [col("userId"), col("name"), col("city")] track_history_except_column_list = ["operation", "sequenceNum"] track_history_except_column_list = [col("operation"), col("sequenceNum")
Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId) , но нельзя использовать col(source.userId) . По умолчанию включаются все столбцы в целевой таблице, если в функцию не передается аргумент track_history_column_list или track_history_except_column_list . |