Поделиться через


create_auto_cdc_from_snapshot_flow

Это важно

Эта функция доступна в общедоступной предварительной версии.

Функция create_auto_cdc_from_snapshot_flow создает поток, который использует функциональность декларативного конвейера Lakeflow Spark с технологией захвата изменения данных (CDC) для обработки исходных данных из моментальных снимков базы данных. Узнайте , как CDC реализован с помощью AUTO CDC FROM SNAPSHOT API?.

Замечание

Эта функция заменяет предыдущую функцию apply_changes_from_snapshot(). Две функции имеют одинаковую сигнатуру. Databricks рекомендует обновиться, чтобы использовать новое название.

Это важно

Для этой операции должна быть целевая таблица потоковой передачи.

Чтобы создать требуемую целевую таблицу, можно использовать функцию create_streaming_table().

Синтаксис

from pyspark import pipelines as dp

dp.create_auto_cdc_from_snapshot_flow(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Замечание

Для AUTO CDC FROM SNAPSHOT обработки поведение по умолчанию заключается в вставке новой строки, если соответствующая запись с теми же ключами не существует в целевом объекте. Если соответствующая запись существует, она обновляется только в том случае, если все значения в строке изменились. Строки с ключами, которые присутствуют в целевом объекте, но больше не встречаются в источнике, удаляются.

Дополнительные сведения об обработке CDC с помощью моментальных снимков см. в статье API AUTO CDC: упрощение отслеживания измененных данных с помощью конвейеров. Примеры использования функции create_auto_cdc_from_snapshot_flow() см. в примерах потребления периодических моментальных снимков и исторических моментальных снимков.

Параметры

Параметр Тип Description
target str Обязательное. Имя обновляемой таблицы. Функцию create_streaming_table() можно использовать для создания целевой таблицы перед выполнением create_auto_cdc_from_snapshot_flow() функции.
source str или lambda function Обязательное. Имя таблицы или представления, для периодического создания моментального снимка, или лямбда-функция Python, которая возвращает обрабатываемый DataFrame моментального снимка и версию моментального снимка. См. раздел "Реализация аргументаsource".
keys list Обязательное. Столбец или сочетание столбцов, однозначно определяющих строку в исходных данных. Это позволяет определить, какие события CDC применяются к определенным записям в целевой таблице. Можно указать любой из следующих вариантов:
  • Список строк: ["userId", "orderId"]
  • Список функций Spark SQLcol(): [col("userId"), col("orderId"] Аргументы функций col() не могут включать квалификаторы. Например, можно использовать col(userId), но нельзя использовать col(source.userId).
stored_as_scd_type str или int Следует ли хранить записи как SCD типа 1 или SCD типа 2. 1 Установите значение для SCD типа 1 или 2 для SCD типа 2. Значение по умолчанию — SCD тип 1.
track_history_column_list или track_history_except_column_list list Подмножество выходных столбцов для отслеживания истории в целевой таблице. Используется track_history_column_list для указания полного списка столбцов для отслеживания. Используется track_history_except_column_list для указания столбцов, которые следует исключить из отслеживания. Можно объявить любое значение в виде списка строк или как функции Spark SQL col() :
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Аргументы функций col() не могут включать квалификаторы. Например, можно использовать col(userId), но нельзя использовать col(source.userId). Значение по умолчанию — включать все столбцы в целевую таблицу, если в функцию не передаются аргументы track_history_column_list или track_history_except_column_list.

Реализация аргумента source

Функция create_auto_cdc_from_snapshot_flow() включает source аргумент. Для обработки исторических моментальных снимков ожидается, что аргумент source будет лямбда-функцией Python, которая возвращает два значения в функцию create_auto_cdc_from_snapshot_flow(): DataFrame Python, содержащий данные моментального снимка для обработки, и версию моментального снимка.

Ниже приведена подпись лямбда-функции:

lambda Any => Optional[(DataFrame, Any)]
  • Аргументом для лямбда-функции является наиболее недавно обработанная версия моментального снимка.
  • Возвращаемое значение лямбда-функции — None или кортеж из двух значений: первое значение кортежа — это DataFrame, содержащий моментальный снимок для обработки. Вторым значением кортежа является версия моментального снимка, которая обозначает его логический порядок.

Пример, реализующий и вызывающий лямбда-функцию:

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

create_auto_cdc_from_snapshot_flow(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Среда выполнения декларативных конвейеров Lakeflow Spark выполняет следующие шаги каждый раз, когда запускается конвейер, содержащий функцию create_auto_cdc_from_snapshot_flow():

  1. next_snapshot_and_version Функция запускается, чтобы загрузить следующий DataFrame для моментального снимка и соответствующую версию моментального снимка.
  2. Если DataFrame не возвращается, выполнение процесса завершается, и обновление конвейера отмечается как завершенное.
  3. Обнаруживает изменения в новом моментальном снимке и пошагово применяет их к целевой таблице.
  4. Возвращается к шагу 1, чтобы загрузить следующий снимок и его версию.