Заметка
Доступ к этой странице требует авторизации. Вы можете попробовать войти в систему или изменить каталог.
Доступ к этой странице требует авторизации. Вы можете попробовать сменить директорию.
Используйте инструкцию AUTO CDC ... INTO для создания потока, использующего функциональность CDC (отслеживание изменений в данных) декларативных конвейеров Lakeflow Spark. Эта инструкция считывает изменения из источника CDC и применяет их к целевому объекту потоковой передачи.
- Дополнительные сведения о CDC см. в статье "Что такое запись измененных данных(CDC)?".
- Дополнительные сведения об использовании
AUTO CDCсм. в The AUTO CDC API: упрощает перехват измененных данных с помощью конвейеров. - Для получения более подробной информации см. статью
CREATE FLOW
Синтаксис
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Вы определяете ограничения качества данных для целевого объекта, используя то же CONSTRAINT предложение, что и другие запросы конвейера. См. Управление качеством данных, используя ожидания конвейера.
Поведение по умолчанию для INSERT и UPDATE событий заключается в том, чтобы обновлять события CDC из источника: обновлять все строки в целевой таблице, соответствующие указанным ключам или вставлять новую строку, если соответствующая запись не существует в целевой таблице. Обработку событий DELETE можно задать с условием APPLY AS DELETE WHEN.
Это важно
Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. Для таблиц типа SCD 2 при указании схемы целевой таблицы необходимо также включить столбцы __START_AT и __END_AT с тем же типом данных, что и поле sequence_by.
См . API AUTO CDC: упрощение отслеживания изменений с помощью конвейеров.
Параметры
flow_nameИмя создаваемого потока.
sourceИсточник данных. Источник должен быть потоковым. Используйте ключевое слово STREAM для использования семантики потоковой передачи для чтения из источника. Если чтение сталкивается с изменением или удалением существующей записи, возникает ошибка. Самое безопасное — читать из статических или источников только для добавления. Для приема данных с коммитами изменений можно использовать Python и
SkipChangeCommitsопцию для обработки ошибок.Дополнительные сведения о потоковой передаче данных см. в разделе Преобразование данных с помощью конвейеров.
KEYSСтолбец или сочетание столбцов, однозначно определяющих строку в исходных данных. Значения в этих столбцах используются для определения того, какие события CDC применяются к определенным записям в целевой таблице.
Чтобы определить сочетание столбцов, используйте разделенный запятыми список столбцов.
Это пункт обязательно.
IGNORE NULL UPDATESПозволяет получать обновления, содержащие подмножество целевых столбцов. При совпадении события CDC с существующей строкой и указан параметр IGNORE NULL UPDATES, столбцы со
nullзначением будут хранить существующие значения в целевом объекте. Это также относится к вложенным столбцам со значениемnull.Это предложение является необязательным.
Значение по умолчанию — перезаписать существующие столбцы со значениями
null.APPLY AS DELETE WHENУказывает, когда событие CDC следует рассматривать как
DELETEвместо upsert.Для источников SCD типа 2 для обработки данных, поступающих не по порядку, удаленная строка временно сохраняется как метка удаления в базовой таблице Delta, а представление создается в хранилище метаданных, которое фильтрует эти метки удаления. Интервал хранения можно настроить с помощью
pipelines.cdc.tombstoneGCThresholdInSecondsсвойства таблицы.Это предложение является необязательным.
APPLY AS TRUNCATE WHENУказывает, когда событие CDC должно рассматриваться как полная таблица
TRUNCATE. Поскольку эта команда активирует полную очистку целевой таблицы, её следует использовать только для конкретных случаев, требующих этой функции.Предложение
APPLY AS TRUNCATE WHENподдерживается только для SCD типа 1. SCD типа 2 не поддерживает операцию усечения.Это предложение является необязательным.
SEQUENCE BYИмя столбца, указывающее логический порядок событий CDC в исходных данных. Обработка конвейера использует эту последовательность для обработки событий изменений, поступающих не по порядку.
Если для упорядочивания требуется несколько столбцов, используйте
STRUCTвыражение: сначала оно будет упорядочивать по первому полю структуры, затем, если значения одинаковы, по второму полю и так далее.Указанные столбцы должны быть сортируемыми типами данных.
Это предложение является обязательным.
COLUMNSУказывает подмножество столбцов для включения в целевую таблицу. Вы можете сделать одно из двух:
- Укажите полный список столбцов, которые необходимо включить:
COLUMNS (userId, name, city) - Укажите список столбцов, которые следует исключить:
COLUMNS * EXCEPT (operation, sequenceNum)
Это предложение является необязательным.
По умолчанию все столбцы включаются в целевую таблицу, если не указано условие
COLUMNS.- Укажите полный список столбцов, которые необходимо включить:
STORED ASСледует ли хранить записи как SCD типа 1 или SCD типа 2.
Это предложение является необязательным.
Значение по умолчанию — SCD тип 1.
TRACK HISTORY ONЗадает подмножество выходных столбцов для создания записей журнала при наличии изменений в указанных столбцах. Вы можете сделать одно из двух:
- Укажите полный список столбцов для отслеживания:
COLUMNS (userId, name, city) - Укажите список столбцов, которые следует исключить из отслеживания:
COLUMNS * EXCEPT (operation, sequenceNum)
Это предложение является необязательным. По умолчанию ведется история изменений для всех выходных столбцов при любом изменении, что эквивалентно
TRACK HISTORY ON *.- Укажите полный список столбцов для отслеживания:
Примеры
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);