Заметка
Доступ к этой странице требует авторизации. Вы можете попробовать войти в систему или изменить каталог.
Доступ к этой странице требует авторизации. Вы можете попробовать сменить директорию.
Используйте оператор CREATE FLOW для создания потоков или дозаполнения для таблиц в конвейере.
Синтаксис
CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}
Параметры
flow_name
Имя создаваемого потока.
КОММЕНТАРИЙ
Необязательное описание потока.
-
Оператор
AUTO CDC ... INTO, определяющий поток с помощью оператораcreate_auto_cdc_flow_spec. Необходимо включить инструкциюAUTO CDC ... INTOили инструкциюINSERT INTO. ИспользуетсяAUTO CDC ... INTO, когда исходный запрос использует семантику измененных данных.Дополнительные сведения см. в разделе AUTO CDC INTO (конвейеры).
target_table
Таблица, которую нужно обновить. Это должна быть таблица потоковой передачи.
INSERT В
Определяет запрос таблицы, вставленный в целевую таблицу.
ONCEЕсли параметр не указан, запрос должен быть потоковым запросом. Используйте ключевое слово STREAM для использования семантики потоковой передачи для чтения из источника. Если чтение сталкивается с изменением или удалением существующей записи, возникает ошибка. Самое безопасное — читать из статических или источников только для добавления. Для приема данных с коммитами изменений можно использовать Python иSkipChangeCommitsопцию для обработки ошибок.INSERT INTOвзаимоисключающ сAUTO CDC ... INTO. ИспользуйтеAUTO CDC ... INTO, когда исходные данные включают функции отслеживания измененных данных (CDC). ИспользуйтеINSERT INTO, когда источник этого не делает.Дополнительные сведения о потоковой передаче данных см. в разделе Преобразование данных с помощью конвейеров.
ОДИН РАЗ
При необходимости определите поток как единовременный поток, например восполнение. Использование
ONCEменяет поток двумя способами:- Источник
queryилиcreate_auto_cdc_flow_specне является потоковой таблицей. - Поток выполняется один раз по умолчанию. Если конвейер обновляется с полным обновлением,
ONCEпоток будет выполняться повторно для восстановления данных.
- Источник
Примеры
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;
-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);
-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;
-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;
-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;