Поделиться через


AUTO CDC INTO (конвейеры)

Используйте инструкцию AUTO CDC ... INTO для создания потока, использующего функциональность CDC (отслеживание изменений в данных) декларативных конвейеров Lakeflow Spark. Эта инструкция считывает изменения из источника CDC и применяет их к целевому объекту потоковой передачи.

Синтаксис

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Вы определяете ограничения качества данных для целевого объекта, используя то же CONSTRAINT предложение, что и другие запросы конвейера. См. Управление качеством данных, используя ожидания конвейера.

Поведение по умолчанию для INSERT и UPDATE событий заключается в том, чтобы обновлять события CDC из источника: обновлять все строки в целевой таблице, соответствующие указанным ключам или вставлять новую строку, если соответствующая запись не существует в целевой таблице. Обработку событий DELETE можно задать с условием APPLY AS DELETE WHEN.

Это важно

Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. Для таблиц типа SCD 2 при указании схемы целевой таблицы необходимо также включить столбцы __START_AT и __END_AT с тем же типом данных, что и поле sequence_by.

См . API AUTO CDC: упрощение отслеживания изменений с помощью конвейеров.

Параметры

  • flow_name

    Имя создаваемого потока.

  • source

    Источник данных. Источник должен быть потоковым. Используйте ключевое слово STREAM для использования семантики потоковой передачи для чтения из источника. Если чтение сталкивается с изменением или удалением существующей записи, возникает ошибка. Самое безопасное — читать из статических или источников только для добавления. Для приема данных с коммитами изменений можно использовать Python и SkipChangeCommits опцию для обработки ошибок.

    Дополнительные сведения о потоковой передаче данных см. в разделе Преобразование данных с помощью конвейеров.

  • KEYS

    Столбец или сочетание столбцов, однозначно определяющих строку в исходных данных. Значения в этих столбцах используются для определения того, какие события CDC применяются к определенным записям в целевой таблице.

    Чтобы определить сочетание столбцов, используйте разделенный запятыми список столбцов.

    Это пункт обязательно.

  • IGNORE NULL UPDATES

    Позволяет получать обновления, содержащие подмножество целевых столбцов. При совпадении события CDC с существующей строкой и указан параметр IGNORE NULL UPDATES, столбцы со null значением будут хранить существующие значения в целевом объекте. Это также относится к вложенным столбцам со значением null .

    Это предложение является необязательным.

    Значение по умолчанию — перезаписать существующие столбцы со значениями null .

  • APPLY AS DELETE WHEN

    Указывает, когда событие CDC следует рассматривать как DELETE вместо upsert.

    Для источников SCD типа 2 для обработки данных, поступающих не по порядку, удаленная строка временно сохраняется как метка удаления в базовой таблице Delta, а представление создается в хранилище метаданных, которое фильтрует эти метки удаления. Интервал хранения можно настроить с помощью pipelines.cdc.tombstoneGCThresholdInSecondsсвойства таблицы.

    Это предложение является необязательным.

  • APPLY AS TRUNCATE WHEN

    Указывает, когда событие CDC должно рассматриваться как полная таблица TRUNCATE. Поскольку эта команда активирует полную очистку целевой таблицы, её следует использовать только для конкретных случаев, требующих этой функции.

    Предложение APPLY AS TRUNCATE WHEN поддерживается только для SCD типа 1. SCD типа 2 не поддерживает операцию усечения.

    Это предложение является необязательным.

  • SEQUENCE BY

    Имя столбца, указывающее логический порядок событий CDC в исходных данных. Обработка конвейера использует эту последовательность для обработки событий изменений, поступающих не по порядку.

    Если для упорядочивания требуется несколько столбцов, используйте STRUCT выражение: сначала оно будет упорядочивать по первому полю структуры, затем, если значения одинаковы, по второму полю и так далее.

    Указанные столбцы должны быть сортируемыми типами данных.

    Это предложение является обязательным.

  • COLUMNS

    Указывает подмножество столбцов для включения в целевую таблицу. Вы можете сделать одно из двух:

    • Укажите полный список столбцов, которые необходимо включить: COLUMNS (userId, name, city)
    • Укажите список столбцов, которые следует исключить: COLUMNS * EXCEPT (operation, sequenceNum)

    Это предложение является необязательным.

    По умолчанию все столбцы включаются в целевую таблицу, если не указано условие COLUMNS.

  • STORED AS

    Следует ли хранить записи как SCD типа 1 или SCD типа 2.

    Это предложение является необязательным.

    Значение по умолчанию — SCD тип 1.

  • TRACK HISTORY ON

    Задает подмножество выходных столбцов для создания записей журнала при наличии изменений в указанных столбцах. Вы можете сделать одно из двух:

    • Укажите полный список столбцов для отслеживания: COLUMNS (userId, name, city)
    • Укажите список столбцов, которые следует исключить из отслеживания: COLUMNS * EXCEPT (operation, sequenceNum)

    Это предложение является необязательным. По умолчанию ведется история изменений для всех выходных столбцов при любом изменении, что эквивалентно TRACK HISTORY ON *.

Примеры

-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
  target
FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);