Compartir vía


AUTO CDC INTO (canalizaciones)

Utiliza la instrucción AUTO CDC ... INTO para crear un flujo que emplee la funcionalidad de captura de datos de cambios (CDC) de las canalizaciones declarativas de Spark de Lakeflow. Esta instrucción lee los cambios de un origen CDC y los aplica a un destino de streaming.

Syntax

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Las restricciones de calidad de datos para el destino se definen usando la misma cláusula CONSTRAINT que otras consultas de canalización. Consulte Administración de la calidad de los datos con las expectativas de canalización.

El comportamiento predeterminado de INSERT los eventos y UPDATE es actualizar los eventos CDC del origen: actualice las filas de la tabla de destino que coincidan con las claves especificadas o inserte una nueva fila cuando no exista un registro coincidente en la tabla de destino. El control de eventos DELETE se puede especificar con la condición APPLY AS DELETE WHEN.

Importante

Debe declarar una tabla de streaming de destino para aplicar los cambios. Opcionalmente, puede especificar el esquema de la tabla de destino. Para las tablas de tipo SCD 2, al especificar el esquema de la tabla de destino, también debe incluir las columnas __START_AT y __END_AT con el mismo tipo de datos que el campo sequence_by.

Consulte Las API DE AUTO CDC: Simplificación de la captura de datos modificados con canalizaciones.

Parámetros

  • flow_name

    Nombre del flujo que se va a crear.

  • source

    Origen de los datos. El origen debe ser un origen de streaming . Use la palabra clave STREAM para usar la semántica de streaming para leer desde el origen. Si la lectura encuentra un cambio o eliminación en un registro existente, se produce un error. Es más seguro leer de orígenes estáticos o de solo anexión. Para ingerir datos que tienen confirmaciones de cambios, puede usar Python y la SkipChangeCommits opción para controlar errores.

    Para más información sobre los datos de streaming, consulte Transformación de datos con canalizaciones.

  • KEYS

    Columna o combinación de columnas que identifican de forma única una fila en los datos de origen. Los valores de estas columnas se usan para identificar qué eventos CDC se aplican a registros específicos de la tabla de destino.

    Para definir una combinación de columnas, use una lista separada por comas de columnas.

    Esta cláusula es necesaria.

  • IGNORE NULL UPDATES

    Permite la ingesta de actualizaciones que contienen un subconjunto de las columnas de destino. Cuando un evento CDC coincide con una fila existente y se especifica IGNORAR ACTUALIZACIONES NULAS, las columnas con un null, valor mantendrán su valor existente en el destino. Esto también se aplica a las columnas anidadas con un valor null.

    Esta cláusula es opcional.

    El valor predeterminado es sobrescribir las columnas existentes con null valores.

  • APPLY AS DELETE WHEN

    Especifica cuándo se debe tratar un evento de CDC como un DELETE en lugar de un upsert.

    Para los orígenes de tipo SCD 2, para controlar los datos desordenados, la fila eliminada se conserva temporalmente como una lápida en la tabla Delta subyacente y se crea una vista en la metastore que filtra estas lápidas. El intervalo de retención se puede configurar con la pipelines.cdc.tombstoneGCThresholdInSecondspropiedad tabla.

    Esta cláusula es opcional.

  • APPLY AS TRUNCATE WHEN

    Especifica cuándo se debe tratar un evento CDC como una tabla TRUNCATEcompleta. Dado que esta cláusula desencadena un truncamiento completo de la tabla de destino, solo se debe usar para casos de uso específicos que requieren esta funcionalidad.

    La APPLY AS TRUNCATE WHEN cláusula solo se admite para el tipo SCD 1. El tipo SCD 2 no admite la operación de truncamiento.

    Esta cláusula es opcional.

  • SEQUENCE BY

    Nombre de columna que especifica el orden lógico de los eventos CDC en los datos de origen. El procesamiento de canalización usa esta secuenciación para gestionar los eventos de cambio que llegan fuera de orden.

    Si se necesitan varias columnas para la secuenciación, use una STRUCT expresión: ordenará primero por el primer campo struct, después por el segundo campo si hay un empate, etc.

    Las columnas especificadas deben ser tipos de datos ordenables.

    Esta cláusula es necesaria.

  • COLUMNS

    Especifica un subconjunto de columnas que se van a incluir en la tabla de destino. Puede hacer lo siguiente:

    • Especifique la lista completa de columnas que se van a incluir: COLUMNS (userId, name, city).
    • Especifique una lista de columnas que se van a excluir: COLUMNS * EXCEPT (operation, sequenceNum)

    Esta cláusula es opcional.

    El valor predeterminado es incluir todas las columnas de la tabla de destino cuando no se especifica la COLUMNS cláusula .

  • STORED AS

    Si se van a almacenar registros como tipo SCD 1 o SCD tipo 2.

    Esta cláusula es opcional.

    El valor predeterminado es SCD de tipo 1.

  • TRACK HISTORY ON

    Especifica un subconjunto de columnas de salida para generar registros de historial cuando hay cambios en esas columnas especificadas. Puede hacer lo siguiente:

    • Especifique la lista completa de columnas para realizar el seguimiento: COLUMNS (userId, name, city).
    • Especifique una lista de columnas que se excluirán del seguimiento: COLUMNS * EXCEPT (operation, sequenceNum)

    Esta cláusula es opcional. El valor predeterminado es realizar un seguimiento del historial de todas las columnas de salida cuando hay cambios, equivalentes a TRACK HISTORY ON *.

Examples

-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
  target
FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);