Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Utiliza la instrucción AUTO CDC ... INTO para crear un flujo que emplee la funcionalidad de captura de datos de cambios (CDC) de las canalizaciones declarativas de Spark de Lakeflow. Esta instrucción lee los cambios de un origen CDC y los aplica a un destino de streaming.
- Para obtener información sobre CDC, consulte ¿Qué es la captura de datos modificados (CDC)?.
- Para obtener más información sobre el uso de
AUTO CDC, consulte Las API de AUTO CDC: Simplifica la captura de cambios en datos con canalizaciones. - Para obtener más información sobre
CREATE FLOW, consulte CREATE FLOW (canalizaciones).
Syntax
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Las restricciones de calidad de datos para el destino se definen usando la misma cláusula CONSTRAINT que otras consultas de canalización. Consulte Administración de la calidad de los datos con las expectativas de canalización.
El comportamiento predeterminado de INSERT los eventos y UPDATE es actualizar los eventos CDC del origen: actualice las filas de la tabla de destino que coincidan con las claves especificadas o inserte una nueva fila cuando no exista un registro coincidente en la tabla de destino. El control de eventos DELETE se puede especificar con la condición APPLY AS DELETE WHEN.
Importante
Debe declarar una tabla de streaming de destino para aplicar los cambios. Opcionalmente, puede especificar el esquema de la tabla de destino. Para las tablas de tipo SCD 2, al especificar el esquema de la tabla de destino, también debe incluir las columnas __START_AT y __END_AT con el mismo tipo de datos que el campo sequence_by.
Consulte Las API DE AUTO CDC: Simplificación de la captura de datos modificados con canalizaciones.
Parámetros
flow_nameNombre del flujo que se va a crear.
sourceOrigen de los datos. El origen debe ser un origen de streaming . Use la palabra clave STREAM para usar la semántica de streaming para leer desde el origen. Si la lectura encuentra un cambio o eliminación en un registro existente, se produce un error. Es más seguro leer de orígenes estáticos o de solo anexión. Para ingerir datos que tienen confirmaciones de cambios, puede usar Python y la
SkipChangeCommitsopción para controlar errores.Para más información sobre los datos de streaming, consulte Transformación de datos con canalizaciones.
KEYSColumna o combinación de columnas que identifican de forma única una fila en los datos de origen. Los valores de estas columnas se usan para identificar qué eventos CDC se aplican a registros específicos de la tabla de destino.
Para definir una combinación de columnas, use una lista separada por comas de columnas.
Esta cláusula es necesaria.
IGNORE NULL UPDATESPermite la ingesta de actualizaciones que contienen un subconjunto de las columnas de destino. Cuando un evento CDC coincide con una fila existente y se especifica IGNORAR ACTUALIZACIONES NULAS, las columnas con un
null, valor mantendrán su valor existente en el destino. Esto también se aplica a las columnas anidadas con un valornull.Esta cláusula es opcional.
El valor predeterminado es sobrescribir las columnas existentes con
nullvalores.APPLY AS DELETE WHENEspecifica cuándo se debe tratar un evento de CDC como un
DELETEen lugar de un upsert.Para los orígenes de tipo SCD 2, para controlar los datos desordenados, la fila eliminada se conserva temporalmente como una lápida en la tabla Delta subyacente y se crea una vista en la metastore que filtra estas lápidas. El intervalo de retención se puede configurar con la
pipelines.cdc.tombstoneGCThresholdInSecondspropiedad tabla.Esta cláusula es opcional.
APPLY AS TRUNCATE WHENEspecifica cuándo se debe tratar un evento CDC como una tabla
TRUNCATEcompleta. Dado que esta cláusula desencadena un truncamiento completo de la tabla de destino, solo se debe usar para casos de uso específicos que requieren esta funcionalidad.La
APPLY AS TRUNCATE WHENcláusula solo se admite para el tipo SCD 1. El tipo SCD 2 no admite la operación de truncamiento.Esta cláusula es opcional.
SEQUENCE BYNombre de columna que especifica el orden lógico de los eventos CDC en los datos de origen. El procesamiento de canalización usa esta secuenciación para gestionar los eventos de cambio que llegan fuera de orden.
Si se necesitan varias columnas para la secuenciación, use una
STRUCTexpresión: ordenará primero por el primer campo struct, después por el segundo campo si hay un empate, etc.Las columnas especificadas deben ser tipos de datos ordenables.
Esta cláusula es necesaria.
COLUMNSEspecifica un subconjunto de columnas que se van a incluir en la tabla de destino. Puede hacer lo siguiente:
- Especifique la lista completa de columnas que se van a incluir:
COLUMNS (userId, name, city). - Especifique una lista de columnas que se van a excluir:
COLUMNS * EXCEPT (operation, sequenceNum)
Esta cláusula es opcional.
El valor predeterminado es incluir todas las columnas de la tabla de destino cuando no se especifica la
COLUMNScláusula .- Especifique la lista completa de columnas que se van a incluir:
STORED ASSi se van a almacenar registros como tipo SCD 1 o SCD tipo 2.
Esta cláusula es opcional.
El valor predeterminado es SCD de tipo 1.
TRACK HISTORY ONEspecifica un subconjunto de columnas de salida para generar registros de historial cuando hay cambios en esas columnas especificadas. Puede hacer lo siguiente:
- Especifique la lista completa de columnas para realizar el seguimiento:
COLUMNS (userId, name, city). - Especifique una lista de columnas que se excluirán del seguimiento:
COLUMNS * EXCEPT (operation, sequenceNum)
Esta cláusula es opcional. El valor predeterminado es realizar un seguimiento del historial de todas las columnas de salida cuando hay cambios, equivalentes a
TRACK HISTORY ON *.- Especifique la lista completa de columnas para realizar el seguimiento:
Examples
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);