Compartir a través de


CREATE STREAMING TABLE ... FLOW AUTO CDC

Se aplica a:casilla marcada como Sí Databricks SQL

Importante

Esta característica se encuentra en su versión beta. Requiere Databricks Runtime 17.3 y versiones posteriores.

Use la FLOW AUTO CDC cláusula con CREATE STREAMING TABLE para procesar registros de captura de datos modificados (CDC) de un origen en una tabla de streaming.

Anteriormente, la MERGE INTO instrucción se usaba normalmente para procesar registros CDC en Azure Databricks. Sin embargo, MERGE INTO puede producir resultados incorrectos debido a registros fuera de secuencia o requiere una lógica compleja para volver a ordenar los registros.

AUTO CDC simplifica cdc mediante el control automático de registros desordenados. Las claves se especifican para identificar registros, una columna de secuencia para ordenar y si se deben almacenar los resultados como scD tipo 1 (actualizaciones directas) o scD tipo 2 (seguimiento del historial).

Sintaxis

CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

El comportamiento predeterminado de INSERT los eventos y UPDATE es actualizar los eventos CDC del origen: actualice las filas de la tabla de destino que coincidan con las claves especificadas o inserte una nueva fila cuando no exista un registro coincidente en la tabla de destino. El control de eventos DELETE se puede especificar con la condición APPLY AS DELETE WHEN.

Parámetros

  • source

    Origen de los datos. El origen debe ser un origen de streaming. Use la palabra clave STREAM para usar la semántica de streaming para leer desde el origen. Si la lectura encuentra un cambio o eliminación en un registro existente, se produce un error. Es más seguro leer de orígenes estáticos o de solo anexión.

    Para más información sobre los datos de streaming, consulte Transformación de datos con canalizaciones.

  • KEYS

    Columna o combinación de columnas que identifican de forma única una fila en los datos de origen. Los valores de estas columnas se usan para identificar qué eventos CDC se aplican a registros específicos de la tabla de destino.

    Para definir una combinación de columnas, use una lista separada por comas de columnas.

    Esta cláusula es necesaria.

  • IGNORE NULL UPDATES

    Permite la ingesta de actualizaciones que contienen un subconjunto de las columnas de destino. Cuando un evento CDC coincide con una fila existente y IGNORE NULL UPDATES se especifica, las columnas con un null valor conservan sus valores existentes en el destino. Esto también se aplica a las columnas anidadas con un valor null.

    Esta cláusula es opcional.

    El valor predeterminado es sobrescribir las columnas existentes con null valores.

  • APPLY AS DELETE WHEN

    Especifica cuándo se debe tratar un evento de CDC como un DELETE en lugar de un upsert.

    Para los orígenes de tipo SCD 2, para controlar los datos desordenados, la fila eliminada se conserva temporalmente como una lápida en la tabla Delta subyacente y se crea una vista en la metastore que filtra estas lápidas. El intervalo de retención se puede configurar con la pipelines.cdc.tombstoneGCThresholdInSecondspropiedad tabla.

    Esta cláusula es opcional.

  • APPLY AS TRUNCATE WHEN

    Especifica cuándo se debe tratar un evento CDC como una tabla TRUNCATEcompleta. Dado que esta cláusula desencadena un truncamiento completo de la tabla de destino, solo se debe usar para casos de uso específicos que requieren esta funcionalidad.

    La APPLY AS TRUNCATE WHEN cláusula solo se admite para el tipo SCD 1. El tipo SCD 2 no admite la operación de truncamiento.

    Esta cláusula es opcional.

  • SEQUENCE BY

    Nombre de columna que especifica el orden lógico de los eventos CDC en los datos de origen. El procesamiento de canalización usa esta secuenciación para gestionar los eventos de cambio que llegan fuera de orden.

    Si se necesitan varias columnas para la secuenciación, use una STRUCT expresión: ordenará primero por el primer campo struct, después por el segundo campo si hay un empate, etc.

    Las columnas especificadas deben ser tipos de datos ordenables.

    Esta cláusula es necesaria.

  • COLUMNS

    Especifica un subconjunto de columnas que se van a incluir en la tabla de destino. Puede hacer lo siguiente:

    • Especifique la lista completa de columnas que se van a incluir: COLUMNS (userId, name, city).
    • Especifique una lista de columnas que se van a excluir: COLUMNS * EXCEPT (operation, sequenceNum)

    Esta cláusula es opcional.

    El valor predeterminado es incluir todas las columnas de la tabla de destino cuando no se especifica la COLUMNS cláusula .

  • STORED AS

    Si se van a almacenar registros como tipo SCD 1 o SCD tipo 2.

    Esta cláusula es opcional.

    El valor predeterminado es SCD de tipo 1.

  • TRACK HISTORY ON

    Especifica un subconjunto de columnas de salida para generar registros de historial cuando hay cambios en esas columnas especificadas. Puede hacer lo siguiente:

    • Especifique la lista completa de columnas para realizar el seguimiento: COLUMNS (userId, name, city).
    • Especifique una lista de columnas que se excluirán del seguimiento: COLUMNS * EXCEPT (operation, sequenceNum)

    Esta cláusula es opcional. El valor predeterminado es realizar un seguimiento del historial de todas las columnas de salida cuando hay cambios, equivalentes a TRACK HISTORY ON *.

Ejemplos

-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  SEQUENCE BY sequenceNum
  STORED AS SCD TYPE 1;

-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2;

-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);