Märkus.
Juurdepääs sellele lehele nõuab autoriseerimist. Võite proovida sisse logida või kausta vahetada.
Juurdepääs sellele lehele nõuab autoriseerimist. Võite proovida kausta vahetada.
Change data feed (CDF) tracks row-level changes between versions of a Delta Lake table or Apache Iceberg v3 table.
Azure Databricks supports two approaches:
- Automatic change data feed: Computes changes during table reads using row lineage metadata. This doesn't require individual table configuration and works on Delta Lake and Apache Iceberg v3 tables. See Automatic change data feed.
- Legacy change data feed: Materializes changes during table writes. Only supports Delta Lake tables. Requires individual table configuration. See Legacy change data feed for Delta Lake.
You can use change data feed for common data use cases including:
- Incremental ETL pipelines that process only the rows that changed since the last pipeline run.
- Audit trails that track data modifications for compliance and governance requirements.
- Data replication workloads that sync changes to downstream tables, caches, or external systems.
Automatic change data feed
Important
This feature is in Public Preview. Workspace admins can control access to this feature from the Previews page. See Manage Azure Databricks previews.
Automatic change data feed computes row-level changes at query time, rather than at write time, using row tracking for Delta Lake and row lineage for Apache Iceberg v3. Unlike legacy change data feed, automatic change data feed doesn't require individual table configuration and works on Delta Lake tables and Apache Iceberg v3 tables.
Because changes are not computed on every write for MERGE INTO and UPDATE operations, automatic change data feed improves write performance and reduces storage costs, compared to legacy change data feed.
Automatic change data feed uses the same table_changes() and readChangeFeed APIs as legacy change data feed and works with batch queries, Structured Streaming, and Databricks-to-Databricks Delta Lake Sharing. See Read changes in batch queries and Incrementally process change data.
Requirements
- Databricks Runtime 18 or above
- A supported table format that is registered in Unity Catalog:
- A managed table in Delta Lake format with row tracking enabled or in Iceberg v3 format.
- An external table in Delta Lake format with row tracking enabled.
See Databricks Unity Catalog table types.
Note
Change data feed isn't part of the Apache Iceberg spec. Azure Databricks readers can query automatic change data feed for Apache Iceberg v3 tables, but external Iceberg readers cannot. See the Iceberg table spec.
For Delta Lake, only Azure Databricks readers can query automatic change data feed.
Use change data feed
To use change data feed, verify you are using a table that meets the requirements. See Requirements.
To batch read change data feed, do the following:
Python
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("<table_name>")
Scala
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("<table_name>")
SQL
SELECT * FROM table_changes('<table_name>', 0)
For more information on batch reads for change data feed, see Read changes in batch queries.
To stream read change data feed, do the following:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
For more information on streaming reads for change data feed, see Incrementally process change data.
Migrate from legacy change data feed
To migrate a Delta Lake table from legacy change data feed to automatic change data feed, do the following:
- Verify that your table meets the requirements.
- Turn off legacy change data feed by running the following command:
ALTER TABLE <table_name> UNSET TBLPROPERTIES ('delta.enableChangeDataFeed');
You can't use both legacy and automatic change data feeds together.
Change data feed schema
When you read from the change data feed for a table, the query uses the schema for the latest table version. Azure Databricks supports most schema change and evolution operations, but tables with column mapping have limitations. See Tables with column mapping.
In addition to the data columns from the schema of the Delta Lake table, change data feed contains metadata columns that identify the type of change event:
| Column name | Type | Values |
|---|---|---|
_change_type |
String | Contains: insert, update_preimage, update_postimage, delete.preimage is the value before the update, postimage is the value after the update. |
_commit_version |
Long | Contains: the Delta log or table version containing the change. |
_commit_timestamp |
Timestamp | Contains: the timestamp associated when the commit was created. |
If the schema contains columns with the same names as these metadata columns, you can't use change data feed on a table. Before you turn on change data feed, rename columns in your table to resolve this conflict.
Incrementally process change data
Databricks recommends that you use change data feed in combination with Structured Streaming to incrementally process changes from tables. You must use Structured Streaming for Azure Databricks to automatically track versions for your table's change data feed. For CDC processing with SCD type 1 or type 2 tables, see The AUTO CDC APIs: Simplify change data capture with pipelines.
When the stream first starts, change data feed returns the latest snapshot of the table as INSERT records and then returns future changes as change data. Change data feeds commits both change data and new data rows to the table transaction log at the same time.
To configure a stream to read the change data feed of a table, set the option readChangeFeed to true as follows:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
Rate limits
Azure Databricks supports rate limits (maxFilesPerTrigger, maxBytesPerTrigger) and excludeRegex when reading change data. For a complete list of streaming Delta Lake options, see Delta Lake.
Optionally, you can specify a starting version, see Specify a starting version. For versions other than the starting snapshot, rate limits apply atomically to entire commits. Either the current batch includes the entire commit, or the current batch defers the commit to the next batch.
Replay table history
A change data feed is not intended to serve as a permanent record of all changes to a table. It only records changes that occur after change data feed was enabled. You can start a new streaming read to capture the current version and all subsequent changes.
Records in the change data feed are transient and only accessible for a specified retention window. Transaction logs remove table versions and their corresponding change data feed versions at regular intervals. When a version is removed, you can no longer read the change data feed for that version.
Archive change data for permanent history
If your use case requires you to maintain a permanent history of all changes to a table, use incremental logic to write records from the change data feed to a new table.
The following example demonstrates using trigger.AvailableNow to process available data as a batch workload for auditing or full change replays:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
Specify a starting version
To read changes from a specific point, specify a starting version using either a timestamp or version number. Starting versions are required for batch reads. Optionally, you can specify an ending version to limit the range. To learn more about table history, see What is time travel?.
When you configure Structured Streaming workloads that use change data feed, specifying a starting version might affect processing performance:
- New data processing pipelines typically benefit from the default behavior, which records all existing records in the table as
INSERToperations when the stream first starts. - If your target table already contains all the records with appropriate changes up to a certain point, specify a starting version to avoid processing the source table state as
INSERTevents.
The following example shows how to recover from a streaming failure with a corrupted checkpoint. In this example, assume the following conditions:
- Change data feed was enabled on the source table at table creation.
- The target downstream table processed all changes up to and including version 75.
- Version history for the source table is available for versions 70 and above.
When you define the write stream to the existing target table, you must specify a new checkpoint location:
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
Important
If you specify a starting version and that version is not available in the table history, the stream fails to start from a new checkpoint. Because managed tables clean up historic versions automatically, all specified starting versions are eventually deleted.
See Replay table history.
Read changes in batch queries
You can use batch query syntax to read all changes starting from a particular version or to read changes within a specified range of versions as follows:
- Specify versions as integers and timestamps as strings in the format
yyyy-MM-dd[ HH:mm:ss[.SSS]]. - Start and end versions are inclusive. To read from a starting version to the latest version, specify only the starting version.
- If you specify a version before change data feed was enabled, it raises an error.
To use batch reads with starting and ending version options, do the following:
SQL
To read from version 0 to 10, do the following:
SELECT * FROM table_changes('tableName', 0, 10)
To read between two timestamp versions, do the following:
--
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
To read from a starting version to the latest, do the following:
SELECT * FROM table_changes('tableName', 0)
To read changes for a table with special characters in the name, do the following:
SELECT * FROM table_changes('`schema`.`dotted.tableName`', '2021-04-21 06:45:46', '2021-05-21 12:00:00')
See table_changes table-valued function.
Python
To read from version 0 to 10, do the following:
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
To read between two timestamps, do the following:
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
To read from a starting version to the latest, do the following:
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala
To read from version 0 to 10, do the following:
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
To read between two timestamps, do the following:
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
To read from a starting version to the latest, do the following:
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
Handle out-of-range versions
By default, if you specify a version or timestamp that exceeds the last commit, the query returns the error timestampGreaterThanLatestCommit.
In Databricks Runtime 11.3 LTS and above, you can enable tolerance for out-of-range versions as follows:
SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
When this configuration is enabled, the query returns different results as follows:
- A starting version or timestamp beyond the last commit returns an empty result.
- An ending version or timestamp beyond the last commit returns all changes from the start to the last commit.
Legacy change data feed for Delta Lake
Legacy change data feed requires manual configuration for individual Delta Lake tables. Because change data feed isn't included in the Apache Iceberg spec, Apache Iceberg tables aren't supported. Databricks recommends that you migrate to automatic change data feed. See Migrate from legacy change data feed.
When legacy change data feed is turned on, the runtime records change events for all the data written into the table. This includes the row data along with metadata that indicates whether the specified row was inserted, deleted, or updated.
Legacy change data feed uses the same readChangeFeed and table_changes() reading APIs as automatic change data feed. See Incrementally process change data and Read changes in batch queries.
Turn on legacy change data feed
You must explicitly turn on legacy change data feed on individual tables. Use one of the following methods:
New table
Set the table property delta.enableChangeDataFeed = true in the CREATE TABLE command.
CREATE TABLE student (id INT, name STRING, age INT)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
Note
If you turn off legacy change data feed for any interval of time and then turn it on again, the interval will not be queryable. Use automatic change data feed to query changes during the interval. See Automatic change data feed.
Existing table
Set the table property delta.enableChangeDataFeed = true in the ALTER TABLE command.
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Storage considerations
Managed tables record data changes efficiently and might use other features to optimize storage layout.
With legacy change data feed, you must consider the following storage behavior:
- You might see small increase in storage costs because changes might be recorded in separate files.
- Some operations, such as insert-only or full-partition deletions, don't generate change data files. Azure Databricks computes the change data feed directly from the transaction log.
- Change data files use the table's retention policy. The
VACUUMcommand deletes change data files, and changes from the transaction log use checkpoint retention policy.
Databricks recommends that you don't attempt to reconstruct the change data feed by directly querying change data files. Always use Delta Lake and Apache Iceberg APIs.
Limitations
Consider the following limitations for change data feeds:
Tables with column mapping
With column mapping enabled on a Delta Lake table, you can drop or rename columns without rewriting data files. See Rename and drop columns with Delta Lake column mapping.
However, change data feeds have limitations after non-additive schema changes. Non-additive schema changes include the following operations:
- Rename or drop columns.
- Alter column data types.
- Alter column nullability, such as with
ALTER COLUMN ... SET NOT NULL. See Set aNOT NULLconstraint in Azure Databricks.
You can't read change data feeds for a transaction or range in which a non-additive schema change occurs.
To allow for non-additive schema changes before or after the specified range of batch reads, queries use the schema of the range's end version rather than the latest table version. Queries still fail if the version range spans a non-additive schema change.
Automatic change data feed
- Because change data feed isn't supported in the Apache Iceberg spec, external Iceberg clients can't query automatic change data feed. See the Iceberg table spec.
- For multi-statement transactions, if the source table was modified during the transaction, automatic change data feed isn't supported.
- Automatic change data feed isn't supported on tables with row filters or column masks. See Row filters and column masks.
- Change data feed queries can't span table versions where a non-additive schema change occurred, such as a column rename, drop, or data type change. Split the query into ranges before and after the schema change.