Поделиться через


create_auto_cdc_flow

Функция create_auto_cdc_flow() создает поток, использующий функционал захвата изменений данных (CDC) декларативных конвейеров Lakeflow для обработки исходных данных из потока данных изменений (CDF).

Замечание

Эта функция заменяет предыдущую функцию apply_changes(). Две функции имеют одинаковую сигнатуру. Databricks рекомендует перейти на использование нового имени.

Это важно

Для применения изменений необходимо указать целевую потоковую таблицу. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы create_auto_cdc_flow() необходимо включить столбцы __START_AT и __END_AT с тем же типом данных, что и поля sequence_by.

Чтобы создать требуемую целевую таблицу, можно использовать функцию create_streaming_table() в интерфейсе Python для декларативного конвейера Lakeflow.

Синтаксис

import dlt

dlt.create_auto_cdc_flow(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Для обработки create_auto_cdc_flow стандартное поведение для событий INSERT и UPDATE заключается в обновлении или вставке событий CDC из источника: обновляются все строки в целевой таблице, соответствующие указанным ключам, или вставляется новая строка, если соответствующая запись не существует в целевой таблице. DELETE Обработку событий можно указать параметром apply_as_deletes.

Дополнительные сведения об обработке CDC с потоком изменений см. в AUTO CDC API: упрощение отслеживания изменений с помощью декларативных конвейеров Lakeflow. Пример использования функции create_auto_cdc_flow() см. в разделе Пример: Обработка SCD типа 1 и SCD типа 2 с исходными данными CDF.

Параметры

Параметр Тип Описание
target str Обязательное. Имя обновляемой таблицы. Вы можете использовать функцию create_streaming_table(), чтобы создать целевую таблицу перед выполнением функции create_auto_cdc_flow().
source str Обязательное. Источник данных, содержащий записи CDC.
keys list Обязательное. Столбец или сочетание столбцов, однозначно определяющих строку в исходных данных. Это позволяет определить, какие события CDC применяются к определенным записям в целевой таблице. Можно указать любой из следующих вариантов:
  • Список строк: ["userId", "orderId"]
  • Список функций Spark SQLcol(): [col("userId"), col("orderId")] Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId), но нельзя использовать col(source.userId).
sequence_by str, col() или struct() Обязательное. Имена столбцов, указывающие логический порядок событий CDC в исходных данных. Декларативные конвейеры Lakeflow используют эту последовательность для обработки событий изменений, поступающих не в порядке. Указанный столбец должен быть сортируемым типом данных. Можно указать любой из следующих вариантов:
  • Строка: "sequenceNum"
  • Функция Spark SQL col() : col("sequenceNum"). Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId), но нельзя использовать col(source.userId).
  • Объединение struct() нескольких столбцов для разрешения одинаковых значений: struct("timestamp_col", "id_col") вначале будет упорядочивать по первому полю структуры, затем по второму полю, если найдутся одинаковые значения, и так далее.
ignore_null_updates bool Разрешить принимать обновления, содержащие подмножество целевых столбцов. Если событие CDC соответствует существующей строке и ignore_null_updates равно True, столбцы с null сохраняют свои значения в целевом объекте. Это также относится к вложенным столбцам со значением null. Если ignore_null_updates равно False, существующие значения перезаписываются со значениями null.
Значение по умолчанию — False.
apply_as_deletes str или expr() Указывает, когда событие CDC следует рассматривать как DELETE, а не как upsert. Можно указать любой из следующих вариантов:
  • Строка: "Operation = 'DELETE'"
  • Функция Spark SQL expr(): expr("Operation = 'DELETE'")

Чтобы обрабатывать внеочередные данные, удаленная строка временно сохраняется в виде гробового камня в базовой таблице Delta, и создается представление в хранилище метаданных, фильтрующее эти гробовые камни. Интервал хранения можно настроить с помощью pipelines.cdc.tombstoneGCThresholdInSeconds свойства таблицы.
apply_as_truncates str или expr() Указывает, когда событие CDC следует рассматривать как полную таблицу TRUNCATE. Можно указать любой из следующих вариантов:
  • Строка: "Operation = 'TRUNCATE'"
  • Функция Spark SQL expr(): expr("Operation = 'TRUNCATE'")

Так как эта оговорка активирует полное усечение целевой таблицы, ее следует использовать только для конкретных вариантов использования, требующих такой функциональности. Параметр apply_as_truncates поддерживается только для SCD типа 1. Тип SCD 2 не поддерживает операции усечения данных.
column_list или except_column_list list Подмножество столбцов для включения в целевую таблицу. Используйте column_list для указания полного списка включаемых столбцов. Используйте except_column_list для указания исключаемых столбцов. Вы можете объявить любое из этих значений в виде списка строк или в виде функций col() Spark SQL:
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId), но нельзя использовать col(source.userId). По умолчанию включаются все столбцы в целевой таблице, если в функцию не передается аргумент column_list или except_column_list.
stored_as_scd_type str или int Следует ли хранить записи как SCD типа 1 или SCD типа 2. Укажите значение 1 для SCD типа 1 или 2 для SCD типа 2. Значение по умолчанию — SCD тип 1.
track_history_column_list или track_history_except_column_list list Подмножество столбцов выходных данных для исторического отслеживания в целевой таблице. Используйте track_history_column_list, чтобы указать полный список столбцов для отслеживания. Используется track_history_except_column_list для указания столбцов, которые следует исключить из отслеживания. Вы можете объявить любое из этих значений в виде списка строк или в виде функций col() Spark SQL:
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId), но нельзя использовать col(source.userId). По умолчанию включаются все столбцы в целевой таблице, если в функцию не передается аргумент track_history_column_list или track_history_except_column_list.