Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Utilice la instrucción CREATE FLOW
para crear flujos o reposiciones automáticas para las tablas de canalizaciones declarativas de Lakeflow.
Sintaxis
CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}
Parámetros
flow_name
Nombre del flujo que se va a crear.
COMENTARIO
Descripción opcional del flujo.
-
Una instrucción
AUTO CDC ... INTO
que define el flujo junto con uncreate_auto_cdc_flow_spec
. Debe incluir unaAUTO CDC ... INTO
declaración o unaINSERT INTO
declaración. UseAUTO CDC ... INTO
cuando la consulta de origen use la semántica de datos modificados.Para obtener más información, consulte AUTO CDC INTO (Lakeflow Declarative Pipelines).
target_table
La tabla que se actualizará. Debe ser una tabla de streaming.
INSERT EN
Define una consulta de tabla que se inserta en la tabla de destino. Si no se proporciona la
ONCE
opción , la consulta debe ser una consulta de streaming . Usa la palabra clave STREAM para aplicar la semántica de streaming al leer desde el origen. Si la lectura encuentra un cambio o eliminación en un registro existente, se produce un error. Es más seguro leer de orígenes estáticos o de solo anexión. Para ingerir datos que tienen confirmaciones de cambios, puede usar Python y laSkipChangeCommits
opción para controlar errores.INSERT INTO
es mutuamente excluyente conAUTO CDC ... INTO
. UseAUTO CDC ... INTO
cuando los datos de origen incluyan la funcionalidad de captura de datos modificados (CDC). UsaINSERT INTO
cuando la fuente no lo haga.Para más información sobre los datos de streaming, consulte Transformación de datos con canalizaciones.
Una vez
Opcionalmente, defina el flujo como flujo de bono de datos, como una reposición. Usar
ONCE
cambia el flujo de dos maneras:- El origen
query
ocreate_auto_cdc_flow_spec
no es una tabla de streaming. - El flujo se ejecuta una vez de forma predeterminada. Si la canalización se actualiza con una actualización completa, el flujo
ONCE
se ejecutará de nuevo para volver a crear los datos.
- El origen
Ejemplos
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;
-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);
-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;
-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;
-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;