Compartilhar via


CREATE FLOW (pipelines)

Use a instrução CREATE FLOW para criar fluxos ou backfills para tabelas em um pipeline.

Sintaxe

CREATE FLOW flow_name [COMMENT comment] AS
{
  AUTO CDC INTO target_table create_auto_cdc_flow_spec |
  INSERT INTO [ONCE] target_table BY NAME query
}

Parâmetros

  • flow_name

    O nome do fluxo a ser criado.

  • COMENTÁRIO

    Uma descrição opcional para o fluxo.

  • AUTO CDC INTO

    Uma declaração AUTO CDC ... INTO que define o fluxo, com um create_auto_cdc_flow_spec. Você deve incluir uma instrução AUTO CDC ... INTO ou uma instrução INSERT INTO . Use AUTO CDC ... INTO quando a semântica de dados de alteração for usada na consulta de origem.

    Para obter mais informações, consulte AUTO CDC INTO (pipelines).

  • target_table

    A tabela a ser atualizada. Essa deve ser uma tabela streaming.

  • INSERT EM

    Define uma consulta de tabela inserida na tabela de destino. Se a opção ONCE não for fornecida, a consulta deverá ser uma consulta de streaming . Use a palavra-chave STREAM para usar a semântica de streaming para ler a fonte. Se a leitura encontrar uma alteração ou exclusão em um registro existente, um erro será gerado. É mais seguro ler de fontes estáticas ou somente de acréscimos. Para ingerir dados que tenham confirmações de alterações, você pode usar Python e a opção SkipChangeCommits para lidar com erros.

    INSERT INTO é mutuamente exclusivo com AUTO CDC ... INTO. Use AUTO CDC ... INTO quando os dados de origem incluírem a funcionalidade de CDC (captura de dados de alteração). Use INSERT INTO quando a origem não o fizer.

    Para obter mais informações sobre dados de fluxo, consulte Transformar dados com pipelines.

  • Uma vez

    Opcionalmente, defina o fluxo como um fluxo único, como um backfill. Usar ONCE altera o fluxo de duas maneiras:

    • A origem query ou create_auto_cdc_flow_spec não é uma tabela de streaming.
    • O fluxo é executado uma vez por padrão. Ao atualizar o pipeline com uma atualização completa, o fluxo ONCE será executado para recriar os dados novamente.

Exemplos

-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;

-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users BY NAME
SELECT * FROM stream(raw_data.users);

-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users BY NAME
SELECT * FROM user_backfill_table;

-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;

-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;