Use Delta Lake change data feed on Azure Databricks

Note

Change data feed allows Azure Databricks to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records change events for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated.

You can read the change events in batch queries using Spark SQL, Apache Spark DataFrames, and Structured Streaming.

Important

Change data feed works in tandem with table history to provide change information. Because cloning a Delta table creates a separate history, the change data feed on cloned tables doesn’t match that of the original table.

Use cases

Change data feed is not enabled by default. The following use cases should drive when you enable the change data feed.

  • Silver and Gold tables: Improve Delta Lake performance by processing only row-level changes following initial MERGE, UPDATE, or DELETE operations to accelerate and simplify ETL and ELT operations.
  • Materialized views: Create up-to-date, aggregated views of information for use in BI and analytics without having to reprocess the full underlying tables, instead updating only where changes have come through.
  • Transmit changes: Send a change data feed to downstream systems such as Kafka or RDBMS that can use it to incrementally process in later stages of data pipelines.
  • Audit trail table: Capture the change data feed as a Delta table provides perpetual storage and efficient query capability to see all changes over time, including when deletes occur and what updates were made.

Enable change data feed

You must explicitly enable the change data feed option using one of the following methods:

  • New table: Set the table property delta.enableChangeDataFeed = true in the CREATE TABLE command.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Existing table: Set the table property delta.enableChangeDataFeed = true in the ALTER TABLE command.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • All new tables:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Important

Only changes made after you enable the change data feed are recorded; past changes to a table are not captured.

Change data storage

Azure Databricks records change data for UPDATE, DELETE, and MERGE operations in the _change_data folder under the table directory. Some operations, such as insert-only operations and full partition deletes, do not generate data in the _change_data directory because Azure Databricks can efficiently compute the change data feed directly from the transaction log.

The files in the _change_data folder follow the retention policy of the table. Therefore, if you run the VACUUM command, change data feed data is also deleted.

Read changes in batch queries

You can provide either version or timestamp for the start and end. The start and end versions and timestamps are inclusive in the queries. To read the changes from a particular start version to the latest version of the table, specify only the starting version or timestamp.

You specify a version as an integer and a timestamps as a string in the format yyyy-MM-dd[ HH:mm:ss[.SSS]].

If you provide a version lower or timestamp older than one that has recorded change events—that is, when the change data feed was enabled—an error is thrown indicating that the change data feed was not enabled.

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// path based tables
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("pathToMyDeltaTable")

Read changes in streaming queries

Python

# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")

Scala

// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

To get the change data while reading the table, set the option readChangeFeed to true. The startingVersion or startingTimestamp are optional and if not provided the stream returns the latest snapshot of the table at the time of streaming as an INSERT and future changes as change data. Options like rate limits (maxFilesPerTrigger, maxBytesPerTrigger) and excludeRegex are also supported when reading change data.

Note

Rate limiting can be atomic for versions other than the starting snapshot version. That is, the entire commit version will be rate limited or the entire commit will be returned.

By default, if a user passes in a version or timestamp exceeding the last commit on a table, the error timestampGreaterThanLatestCommit is thrown. In Databricks Runtime 11.3 LTS and above, change data feed can handle the out of range version case if the user sets the following configuration to true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

If you provide a start version greater than the last commit on a table or a start timestamp newer than the last commit on a table, then when the preceding configuration is enabled, an empty read result is returned.

If you provide an end version greater than the last commit on a table or an end timestamp newer than the last commit on a table, then when the preceding configuration is enabled in batch read mode, all changes between the start version and the last commit are be returned.

What is the schema for the change data feed?

When you read from the change data feed for a table, the schema for the latest table version is used.

Note

Most schema change and evolution operations are fully supported. Table with column mapping enabled do not support all use cases and demonstrate different behavior. See Change data feed limitations for tables with column mapping enabled.

In addition to the data columns from the schema of the Delta table, change data feed contains metadata columns that identify the type of change event:

Column name Type Values
_change_type String insert, update_preimage , update_postimage, delete (1)
_commit_version Long The Delta log or table version containing the change.
_commit_timestamp Timestamp The timestamp associated when the commit was created.

(1) preimage is the value before the update, postimage is the value after the update.

Note

You cannot enable change data feed on a table if the schema contains columns with the same names as these added columns. Rename columns in the table to resolve this conflict before trying to enable change data feed.

Change data feed limitations for tables with column mapping enabled

With column mapping enabled on a Delta table, you can drop or rename columns in the table without rewriting data files for existing data. With column mapping enabled, change data feed has limitations after performing non-additive schema changes such as renaming or dropping a column, changing data type, or nullability changes.

Important

  • You cannot read change data feed for a transaction or range in which a non-additive schema change occurs using batch semantics.
  • In Databricks Runtime 13.0 and below, tables with column mapping enabled that have experienced non-additive schema changes do not support streaming reads on change data feed. See Streaming with column mapping and schema changes.
  • In Databricks Runtime 12.0 and below, you cannot read change data feed for tables with column mapping enabled that have experienced column renaming or dropping.

In Databricks Runtime 12.1 and above, you can perform batch reads on change data feed for tables with column mapping enabled that have experienced non-additive schema changes. Instead of using the schema of the latest version of the table, read operations use the schema of the end version of the table specified in the query. Queries still fail if the version range specified spans a non-additive schema change.

Frequently asked questions (FAQ)

What is the overhead of enabling the change data feed?

There is no significant impact. The change data records are generated in line during the query execution process, and are generally much smaller than the total size of rewritten files.

What is the retention policy for change records?

Change records follow the same retention policy as out-of-date table versions, and will be cleaned up through VACUUM if they are outside the specified retention period.

When do new records become available in the change data feed?

Change data is committed along with the Delta Lake transaction, and will become available at the same time as the new data is available in the table.

Notebook example: Propagate changes with Delta change data feed

This notebook shows how to propagate changes made to a silver table of absolute number of vaccinations to a gold table of vaccination rates.

Change data feed notebook

Get notebook