Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
This page covers advanced topics for working with AUTO CDC and AUTO CDC FROM SNAPSHOT target tables, including DML operations, reading change data feeds, and monitoring processing metrics. For an introduction to the AUTO CDC APIs, see The AUTO CDC APIs: Simplify change data capture with pipelines.
Add, change, or delete data in a target streaming table
If your pipeline publishes tables to Unity Catalog, you can use data manipulation language (DML) statements, including insert, update, delete, and merge statements, to modify the target streaming tables created by AUTO CDC ... INTO statements.
Note
- DML statements that modify the table schema of a streaming table are not supported. Ensure that your DML statements do not attempt to evolve the table schema.
- DML statements that update a streaming table can be run only in a shared Unity Catalog cluster or a SQL warehouse using Databricks Runtime 13.3 LTS and above.
- Because streaming requires append-only data sources, if your processing requires streaming from a source streaming table with changes (for example, by DML statements), set the skipChangeCommits flag when reading the source streaming table. When
skipChangeCommitsis set, transactions that delete or modify records on the source table are ignored. If your processing does not require a streaming table, you can use a materialized view (which does not have the append-only restriction) as the target table.
Because Lakeflow Spark Declarative Pipelines uses a specified SEQUENCE BY column and propagates appropriate sequencing values to the __START_AT and __END_AT columns of the target table (for SCD Type 2), you must ensure that DML statements use valid values for these columns to maintain the proper ordering of records. See How AUTO CDC works.
For more information about using DML statements with streaming tables, see Add, change, or delete data in a streaming table.
The following example inserts an active record with a start sequence of 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
Tip
If you need to rename the __START_AT and __END_AT columns in your SCD Type 2 target table (for example, to match downstream schema requirements), create a view over the target table:
CREATE VIEW my_employees_view AS
SELECT
*,
__START_AT AS valid_from,
__END_AT AS valid_to
FROM my_scd2_target_table;
Read a change data feed from an AUTO CDC target table
In Databricks Runtime 15.2 and above, you can read a change data feed from a streaming table that is the target of AUTO CDC or AUTO CDC FROM SNAPSHOT queries in the same way that you read a change data feed from other Delta tables. The following are required to read the change data feed from a target streaming table:
- The target streaming table must be published to Unity Catalog. See Use Unity Catalog with pipelines.
- To read the change data feed from the target streaming table, you must use Databricks Runtime 15.2 or above. To read the change data feed in a different pipeline, the pipeline must be configured to use Databricks Runtime 15.2 or above.
You read the change data feed from a target streaming table that was created in Lakeflow Spark Declarative Pipelines the same way as reading a change data feed from other Delta tables. To learn more about using the Delta change data feed functionality, including examples in Python and SQL, see Use Delta Lake change data feed on Azure Databricks.
Note
The change data feed record includes metadata identifying the type of change event. When a record is updated in a table, the metadata for the associated change records typically includes _change_type values set to update_preimage and update_postimage events.
However, the _change_type values are different if updates are made to the target streaming table that include changing primary key values. When changes include updates to primary keys, the _change_type metadata fields are set to insert and delete events. Changes to primary keys can occur when manual updates are made to one of the key fields with an UPDATE or MERGE statement or, for SCD type 2 tables, when the __start_at field changes to reflect an earlier start sequence value.
The AUTO CDC query determines the primary key values, which differ for SCD type 1 and SCD type 2 processing:
| SCD Type | Primary key |
|---|---|
| SCD type 1, and the pipelines Python interface | The primary key is the value of the keys parameter in the create_auto_cdc_flow() function. For the SQL interface the primary key is the columns defined by the KEYS clause in the AUTO CDC ... INTO statement. |
| SCD type 2 | The primary key is the keys parameter or KEYS clause plus the return value from the coalesce(__START_AT, __END_AT) operation, where __START_AT and __END_AT are the corresponding columns from the target streaming table. |
Get data about records processed by a CDC query in pipelines
Note
The following metrics are captured only by AUTO CDC queries and not by AUTO CDC FROM SNAPSHOT queries.
The following metrics are captured by AUTO CDC queries:
num_upserted_rows: The number of output rows upserted into the dataset during an update.num_deleted_rows: The number of existing output rows deleted from the dataset during an update.
The num_output_rows metric, output for non-CDC flows, is not captured for AUTO CDC queries.
What data objects are used for CDC processing in a pipeline?
When you declare the target table in the Hive metastore, two data structures are created:
- A view using the name assigned to the target table.
- An internal backing table used by the pipeline to manage CDC processing. This table is named by prepending
__apply_changes_storage_to the target table name.
For example, if you declare a target table named dp_cdc_target, you see a view named dp_cdc_target and a table named __apply_changes_storage_dp_cdc_target in the metastore. Query the view to access the processed data. Do not modify the backing table directly.
Note
These data structures apply only to AUTO CDC processing, not AUTO CDC FROM SNAPSHOT processing. They also apply only to Hive metastore, not Unity Catalog.