Поделиться через


создать_авто_захват_данных_из_снимка_поток

Это важно

Эта функция доступна в общедоступной предварительной версии.

Функция create_auto_cdc_from_snapshot_flow создает поток, использующий функциональность фиксации изменений данных (CDC) в декларативных конвейерах Lakeflow для обработки исходных данных из моментальных снимков базы данных. См. , как CDC реализуется с помощью API AUTO CDC FROM SNAPSHOT.

Замечание

Эта функция заменяет предыдущую функцию apply_changes_from_snapshot(). Две функции имеют одинаковую сигнатуру. Databricks рекомендует перейти на использование нового имени.

Это важно

Для этой операции должна быть целевая таблица потоковой передачи.

Чтобы создать требуемую целевую таблицу, можно использовать функцию create_streaming_table().

Синтаксис

import dlt

dlt.create_auto_cdc_from_snapshot_flow(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Замечание

Для обработки AUTO CDC FROM SNAPSHOT поведение по умолчанию заключается в том, чтобы вставить новую строку, если соответствующая запись с теми же ключами не существует в целевом объекте. Если соответствующая запись существует, она обновляется только в том случае, если все значения в строке изменились. Строки с ключами, которые присутствуют в целевом объекте, но больше не значатся в источнике, удаляются.

Чтобы узнать больше об обработке CDC с моментальными снимками, см. статью 'API AUTO CDC: упрощение отслеживания изменений с помощью декларативных конвейеров Lakeflow'. Примеры использования функции create_auto_cdc_from_snapshot_flow() смотрите в примере периодического приема моментальных снимков и примере приема исторических моментальных снимков.

Параметры

Параметр Тип Описание
target str Обязательное. Имя обновляемой таблицы. Вы можете использовать функцию create_streaming_table(), чтобы создать целевую таблицу перед выполнением функции create_auto_cdc_from_snapshot_flow().
source str или lambda function Обязательное. Либо имя таблицы или представления для создания периодических моментальных снимков, либо лямбда-функция на Python, которая возвращает DataFrame моментального снимка для последующей обработки и версию моментального снимка. См. Реализация аргумента source
keys list Обязательное. Столбец или сочетание столбцов, однозначно определяющих строку в исходных данных. Это позволяет определить, какие события CDC применяются к определенным записям в целевой таблице. Можно указать любой из следующих вариантов:
  • Список строк: ["userId", "orderId"]
  • Список функций Spark SQLcol(): [col("userId"), col("orderId"]
    Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId), но нельзя использовать col(source.userId).
stored_as_scd_type str или int Следует ли хранить записи как SCD типа 1 или SCD типа 2. Укажите значение 1 для SCD типа 1 или 2 для SCD типа 2. Значение по умолчанию — SCD тип 1.
track_history_column_list или track_history_except_column_list list Подмножество столбцов выходных данных для исторического отслеживания в целевой таблице. Используйте track_history_column_list, чтобы указать полный список столбцов для отслеживания. Используется track_history_except_column_list для указания столбцов, которые следует исключить из отслеживания. Вы можете объявить любое из этих значений в виде списка строк или в виде функций col() Spark SQL:
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId), но нельзя использовать col(source.userId). По умолчанию включаются все столбцы в целевой таблице, если в функцию не передается аргумент track_history_column_list или track_history_except_column_list.

Реализация аргумента source

Функция create_auto_cdc_from_snapshot_flow() включает аргумент source. Для обработки исторических моментальных снимков аргумент source, как ожидается, будет лямбда-функцией Python, которая возвращает два значения функции create_auto_cdc_from_snapshot_flow(): DataFrame Python, содержащий данные моментального снимка для обработки, и версию моментального снимка.

Ниже приведена подпись лямбда-функции:

lambda Any => Optional[(DataFrame, Any)]
  • Аргумент для лямбда-функции — это самая последняя обработанная версия моментального снимка.
  • Возвращаемое значение лямбда-функции — None или кортеж из двух значений: первое значение кортежа — это DataFrame, содержащий снимок данных для обработки. Вторым значением кортежа является версия моментального снимка, определяющая логический порядок этого снимка.

Пример, реализующий и вызывающий лямбда-функцию:

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

create_auto_cdc_from_snapshot_flow(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Исполняющая среда деклараторных пайплайнов Lakeflow выполняет следующие шаги каждый раз, когда запускается конвейер, содержащий функцию create_auto_cdc_from_snapshot_flow():

  1. Запускает функцию next_snapshot_and_version для загрузки следующего моментального снимка DataFrame и соответствующей версии этого снимка.
  2. Если DataFrame не возвращается, выполнение завершается, и обновление конвейера помечается как завершённое.
  3. Обнаруживает изменения в новом снимке состояния и поэтапно применяет их к целевой таблице.
  4. Возвращается к первому шагу, чтобы загрузить следующий снапшот и его версию.