Compartir a través de


CREAR FLUJO (canalizaciones)

Use la CREATE FLOW instrucción para crear flujos o rellenos para tablas en una canalización.

Syntax

CREATE FLOW flow_name [COMMENT comment] AS
{
  AUTO CDC INTO target_table create_auto_cdc_flow_spec |
  INSERT INTO [ONCE] target_table BY NAME query
}

Parámetros

  • flow_name

    Nombre del flujo que se va a crear.

  • COMENTARIO

    Descripción opcional del flujo.

  • AUTO CDC INTO

    Instrucción AUTO CDC ... INTO que define el flujo, con un create_auto_cdc_flow_spec. Debe incluir una instrucción AUTO CDC ... INTO o INSERT INTO. Use AUTO CDC ... INTO cuando la consulta de origen use la semántica de datos modificados.

    Para más información, consulte AUTO CDC INTO (canalizaciones).

  • target_table

    Tabla que se va a actualizar. Debe ser una tabla de Streaming.

  • INSERT EN

    Define una consulta de tabla que se inserta en la tabla de destino. Si no se proporciona la ONCE opción , la consulta debe ser una consulta de streaming . Use la palabra clave STREAM para usar la semántica de streaming para leer desde el origen. Si la lectura encuentra un cambio o eliminación en un registro existente, se produce un error. Es más seguro leer de orígenes estáticos o de solo anexión. Para ingerir datos que tienen confirmaciones de cambios, puede usar Python y la SkipChangeCommits opción para controlar errores.

    INSERT INTO es mutuamente excluyente con AUTO CDC ... INTO. Use AUTO CDC ... INTO cuando los datos de origen incluyan la funcionalidad de captura de datos modificados (CDC). Utilice INSERT INTO cuando el origen no lo haga.

    Para más información sobre los datos de streaming, consulte Transformación de datos con canalizaciones.

  • Una vez

    Opcionalmente, defina el flujo como un flujo de una sola vez, como un reposición. El uso de ONCE cambia el flujo de dos maneras:

    • El origen query o create_auto_cdc_flow_spec no es una tabla de streaming.
    • El flujo se ejecuta una vez de forma predeterminada. Si la canalización se actualiza con una actualización completa, el ONCE flujo se ejecutará de nuevo para volver a crear los datos.

Examples

-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;

-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);

-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;

-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;

-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;