Заметка
Доступ к этой странице требует авторизации. Вы можете попробовать войти в систему или изменить каталог.
Доступ к этой странице требует авторизации. Вы можете попробовать сменить директорию.
Функция create_auto_cdc_flow() создает поток, использующий функциональность CDC (отслеживание изменений данных) декларативных конвейеров Lakeflow Spark для обработки исходных данных из канала изменений (CDF).
Замечание
Эта функция заменяет предыдущую функцию apply_changes(). Две функции имеют одинаковую сигнатуру. Databricks рекомендует обновиться, чтобы использовать новое название.
Это важно
Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой create_auto_cdc_flow() таблицы необходимо включить __START_AT столбцы с тем же типом данных, что __END_AT и sequence_by поля.
Чтобы создать нужную целевую таблицу, можно использовать функцию create_streaming_table() в интерфейсе Python конвейера.
Синтаксис
from pyspark import pipelines as dp
dp.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = <bool>,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None,
name = None,
once = <bool>
)
Для обработки create_auto_cdc_flow поведение по умолчанию для событий INSERT и UPDATE — вставка или обновление событий CDC из источника: обновлять строки в целевой таблице, совпадающие с указанными ключами, или вставлять новую строку, если совпадающая запись не существует в целевой таблице. Указание обработки DELETE событий можно выполнить при помощи параметра apply_as_deletes.
Дополнительные сведения о обработке CDC с помощью канала изменений см. в разделе API AUTO CDC: упрощение отслеживания измененных данных с помощью конвейеров. Пример использования функции create_auto_cdc_flow(), см. статью под названием Пример: Обработка SCD типа 1 и SCD типа 2 с использованием исходных данных CDF.
Параметры
| Параметр | Тип | Description |
|---|---|---|
target |
str |
Обязательное. Имя обновляемой таблицы. Функцию create_streaming_table() можно использовать для создания целевой таблицы перед выполнением create_auto_cdc_flow() функции. |
source |
str |
Обязательное. Источник данных, содержащий записи CDC. |
keys |
list |
Обязательное. Столбец или сочетание столбцов, однозначно определяющих строку в исходных данных. Это позволяет определить, какие события CDC применяются к определенным записям в целевой таблице. Можно указать любой из следующих вариантов:
|
sequence_by |
str, col() или struct() |
Обязательное. Имена столбцов, указывающие логический порядок событий CDC в исходных данных. Декларативные конвейеры Lakeflow для Spark используют эту последовательность для обработки событий изменений, поступающих не в хронологическом порядке. Указанный столбец должен быть сортируемым типом данных. Можно указать любой из следующих вариантов:
|
ignore_null_updates |
bool |
Разрешить прием обновлений, содержащих подмножество целевых столбцов. Если событие CDC соответствует существующей строке и ignore_null_updates является True, столбцы с null сохраняют свои существующие значения в целевом объекте. Это также относится к вложенным столбцам со значением null. Если ignore_null_updates это False, существующие значения перезаписываются со значениями null.Значение по умолчанию — False. |
apply_as_deletes |
str или expr() |
Указывает, когда событие CDC следует рассматривать как DELETE вместо upsert. Можно указать любой из следующих вариантов:
Чтобы обрабатывать не упорядоченные данные, удаленная строка временно сохраняется как метка-надгробие в базовой таблице Delta, а в хранилище метаданных создается представление, фильтрующее эти метки-надгробия. Интервал хранения по умолчанию — два дня, это можно настроить с помощью свойства таблицы pipelines.cdc.tombstoneGCThresholdInSeconds. |
apply_as_truncates |
str или expr() |
Указывает, когда событие CDC должно рассматриваться как полная таблица TRUNCATE. Можно указать любой из следующих вариантов:
Поскольку эта команда активирует полную очистку целевой таблицы, её следует использовать только для конкретных случаев, требующих этой функции. Параметр apply_as_truncates поддерживается только для SCD типа 1. SCD type 2 не поддерживает операции усечения данных. |
column_list или except_column_list |
list |
Подмножество столбцов для включения в целевую таблицу. Используется column_list для указания полного списка столбцов для включения. Используется except_column_list для указания столбцов, которые следует исключить. Можно объявить любое значение в виде списка строк или как функции Spark SQL col() :
Аргументы функций col() не могут включать квалификаторы. Например, можно использовать col(userId), но нельзя использовать col(source.userId). Значение по умолчанию — включать все столбцы в целевую таблицу, если в функцию не передается ни аргумент column_list, ни аргумент except_column_list. |
stored_as_scd_type |
str или int |
Следует ли хранить записи как SCD типа 1 или SCD типа 2.
1 Установите значение для SCD типа 1 или 2 для SCD типа 2. Значение по умолчанию — SCD тип 1. |
track_history_column_list или track_history_except_column_list |
list |
Подмножество выходных столбцов для отслеживания истории в целевой таблице. Используется track_history_column_list для указания полного списка столбцов для отслеживания. Используется track_history_except_column_list для указания столбцов, которые следует исключить из отслеживания. Можно объявить любое значение в виде списка строк или как функции Spark SQL col() :
Аргументы функций col() не могут включать квалификаторы. Например, можно использовать col(userId), но нельзя использовать col(source.userId). Значение по умолчанию — включать все столбцы в целевую таблицу, если в функцию не передается ни аргумент track_history_column_list, ни аргумент track_history_except_column_list. |
name |
str |
Имя потока. Если не указано, по умолчанию используется то же значение, что и target. |
once |
bool |
При необходимости определите поток как одноразовый, например, обратная закачка. Использование once=True меняет поток двумя способами:
|