Share via


CREATE STREAMING TABLE ... FLOW AUTO CDC

Applies to: check marked yes Databricks SQL

Important

This feature is in Beta. Requires Databricks Runtime 17.3 and above.

Use the FLOW AUTO CDC clause with CREATE STREAMING TABLE to process change data capture (CDC) records from a source into a streaming table.

Previously, the MERGE INTO statement was commonly used for processing CDC records on Azure Databricks. However, MERGE INTO can produce incorrect results because of out-of-sequence records or requires complex logic to re-order records.

AUTO CDC simplifies CDC by automatically handling out-of-order records. You specify keys to identify records, a sequence column for ordering, and whether to store results as SCD type 1 (direct updates) or SCD type 2 (history tracking).

Syntax

CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

The default behavior for INSERT and UPDATE events is to upsert CDC events from the source: update any rows in the target table that match the specified keys or insert a new row when a matching record does not exist in the target table. Handling for DELETE events can be specified with the APPLY AS DELETE WHEN condition.

Parameters

  • source

    The source for the data. The source must be a streaming source. Use the STREAM keyword to use streaming semantics to read from the source. If the read encounters a change or deletion to an existing record, an error is thrown. It is safest to read from static or append-only sources.

    For more information on streaming data, see Transform data with pipelines.

  • KEYS

    The column or combination of columns that uniquely identify a row in the source data. The values in these columns are used to identify which CDC events apply to specific records in the target table.

    To define a combination of columns, use a comma-separated list of columns.

    This clause is required.

  • IGNORE NULL UPDATES

    Allows ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and IGNORE NULL UPDATES is specified, columns with a null value retain their existing values in the target. This also applies to nested columns with a null value.

    This clause is optional.

    The default is to overwrite existing columns with null values.

  • APPLY AS DELETE WHEN

    Specifies when a CDC event should be treated as a DELETE rather than an upsert.

    For SCD type 2 sources, to handle out-of-order data, the deleted row is temporarily retained as a tombstone in the underlying Delta table, and a view is created in the metastore that filters out these tombstones. The retention interval can be configured with the pipelines.cdc.tombstoneGCThresholdInSeconds table property.

    This clause is optional.

  • APPLY AS TRUNCATE WHEN

    Specifies when a CDC event should be treated as a full table TRUNCATE. Because this clause triggers a full truncate of the target table, it should be used only for specific use cases requiring this functionality.

    The APPLY AS TRUNCATE WHEN clause is supported only for SCD type 1. SCD type 2 does not support the truncate operation.

    This clause is optional.

  • SEQUENCE BY

    The column name specifying the logical order of CDC events in the source data. The pipeline processing uses this sequencing to handle change events that arrive out of order.

    If multiple columns are needed for sequencing, use a STRUCT expression: it will order by the first struct field first, then by the second field if there's a tie, and so on.

    Specified columns must be sortable data types.

    This clause is required.

  • COLUMNS

    Specifies a subset of columns to include in the target table. You can either:

    • Specify the complete list of columns to include: COLUMNS (userId, name, city).
    • Specify a list of columns to exclude: COLUMNS * EXCEPT (operation, sequenceNum)

    This clause is optional.

    The default is to include all columns in the target table when the COLUMNS clause is not specified.

  • STORED AS

    Whether to store records as SCD type 1 or SCD type 2.

    This clause is optional.

    The default is SCD type 1.

  • TRACK HISTORY ON

    Specifies a subset of output columns to generate history records when there are any changes to those specified columns. You can either:

    • Specify the complete list of columns to track: COLUMNS (userId, name, city).
    • Specify a list of columns to be excluded from tracking: COLUMNS * EXCEPT (operation, sequenceNum)

    This clause is optional. The default is to track history for all the output columns when there are any changes, equivalent to TRACK HISTORY ON *.

Examples

-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  SEQUENCE BY sequenceNum
  STORED AS SCD TYPE 1;

-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2;

-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);