Поделиться через


СОЗДАЙТЕ ПОТОК (конвейеры)

Используйте оператор CREATE FLOW для создания потоков или дозаполнения для таблиц в конвейере.

Синтаксис

CREATE FLOW flow_name [COMMENT comment] AS
{
  AUTO CDC INTO target_table create_auto_cdc_flow_spec |
  INSERT INTO [ONCE] target_table BY NAME query
}

Параметры

  • flow_name

    Имя создаваемого потока.

  • КОММЕНТАРИЙ

    Необязательное описание потока.

  • AUTO CDC INTO

    Оператор AUTO CDC ... INTO , определяющий поток с помощью оператора create_auto_cdc_flow_spec. Необходимо включить инструкцию AUTO CDC ... INTO или инструкцию INSERT INTO . Используется AUTO CDC ... INTO , когда исходный запрос использует семантику измененных данных.

    Дополнительные сведения см. в разделе AUTO CDC INTO (конвейеры).

  • target_table

    Таблица, которую нужно обновить. Это должна быть таблица потоковой передачи.

  • INSERT В

    Определяет запрос таблицы, вставленный в целевую таблицу. ONCE Если параметр не указан, запрос должен быть потоковым запросом. Используйте ключевое слово STREAM для использования семантики потоковой передачи для чтения из источника. Если чтение сталкивается с изменением или удалением существующей записи, возникает ошибка. Самое безопасное — читать из статических или источников только для добавления. Для приема данных с коммитами изменений можно использовать Python и SkipChangeCommits опцию для обработки ошибок.

    INSERT INTO взаимоисключающ с AUTO CDC ... INTO. Используйте AUTO CDC ... INTO , когда исходные данные включают функции отслеживания измененных данных (CDC). Используйте INSERT INTO, когда источник этого не делает.

    Дополнительные сведения о потоковой передаче данных см. в разделе Преобразование данных с помощью конвейеров.

  • ОДИН РАЗ

    При необходимости определите поток как единовременный поток, например восполнение. Использование ONCE меняет поток двумя способами:

    • Источник query или create_auto_cdc_flow_spec не является потоковой таблицей.
    • Поток выполняется один раз по умолчанию. Если конвейер обновляется с полным обновлением, ONCE поток будет выполняться повторно для восстановления данных.

Примеры

-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;

-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);

-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;

-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;

-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;