Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Use a CREATE FLOW instrução para criar fluxos ou backfills para tabelas em um pipeline.
Sintaxe
CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}
Parâmetros
flow_name
O nome do fluxo a ser criado.
COMENTAR
Uma descrição opcional para o fluxo.
-
Uma
AUTO CDC ... INTOdeclaração que define o fluxo, com umcreate_auto_cdc_flow_spec. Você deve incluir umaAUTO CDC ... INTOdeclaração ou umaINSERT INTOdeclaração. UseAUTO CDC ... INTOquando a consulta de origem usa a semântica de dados de alteração.Para obter mais informações, consulte AUTO CDC INTO (pipelines).
target_table
A tabela a ser atualizada. Esta deve ser uma tabela de Streaming.
INSERT EM
Define uma consulta de tabela que é inserida na tabela de destino. Se a
ONCEopção não for fornecida, a consulta deverá ser uma consulta de streaming . Utilize a palavra-chave STREAM para aplicar a semântica de transmissão e ler a partir da fonte. Se a leitura encontrar uma alteração ou exclusão em um registro existente, um erro será gerado. É mais seguro ler a partir de fontes estáticas ou apenas de anexação. Para ingerir dados que têm confirmações de alteração, podes usar Python e a opçãoSkipChangeCommitspara manipular erros.INSERT INTOé mutuamente exclusivo comAUTO CDC ... INTO. UseAUTO CDC ... INTOquando os dados de origem incluírem a funcionalidade de captura de dados de alteração (CDC). UseINSERT INTOquando a fonte não o fizer.Para obter mais informações sobre a transmissão de dados, consulte Transformar dados com pipelines.
UMA VEZ
Opcionalmente, defina o fluxo como um fluxo único, como um preenchimento adicional. O uso
ONCEaltera o fluxo de duas maneiras:- A fonte
queryoucreate_auto_cdc_flow_specnão é uma tabela de streaming. - O fluxo é executado uma vez por padrão. Se o pipeline for atualizado com uma renovação completa, o fluxo
ONCEserá executado novamente para recriar os dados.
- A fonte
Examples
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;
-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);
-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;
-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;
-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;