Deltalake change data feed on synapse notebook
Hi Team,
Could you please help us we are looking to enable change data feed to existing delta file but it is not working, we have used below steps to enable.
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
spark.sql("ALTER TABLE delta.`abfss://******@storage.dfs.core.windows.net/test/incremental_test/` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
df_changes = spark.read.format("delta") \
.option("readChangeData", "true") \
.load(f'abfss://******@storage.dfs.core.windows.net/test/incremental_test/')
AnalysisException**: No startingVersion or startingTimestamp provided for CDC read.**
after running the alter script _change_type,_commit_version,_commit_timestamp columns are not creating in delta file, can you please help me with process and share the detailed steps like how to enable the change data feed, please provide screenshot of the table once it is enabled.
Can you please provide the whole steps as a example code like please create a dummy data frame and write it as delta and please enable change data feed (please read the change data feed with out passing starting version and staring timestamp) and also please give provide code that would helps a lot!
Thanks in Advance
Thank you
Azure Synapse Analytics
-
phemanth • 15,755 Reputation points • Microsoft External Staff • Moderator
2025-05-02T07:59:05.59+00:00 @SaiSekhar, MahasivaRavi (Philadelphia)
It seems you're trying to enable the change data feed (CDF) for an existing Delta table in Azure Synapse but running into an issue where the necessary change tracking columns (_change_type, _commit_version, _commit_timestamp) aren't appearing. Let's walk through the steps to ensure it's set up correctly.
Steps to Enable Change Data Feed on an Existing Delta Table:
- Before enabling CDF, double-check that your Delta table doesn't have any columns that conflict with the required CDF metadata columns. If your table has columns named
_change_type
,_commit_version
, or_commit_timestamp
, you'll need to rename them. - You’ve correctly set the property using the ALTER TABLE command, so just ensure that it has executed successfully. Here’s a reminder of the command:
spark.sql("ALTER TABLE delta.`abfss://******@storage.dfs.core.windows.net/test/incremental_test/` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
- After running the ALTER command, you can check if the properties were set correctly by executing:
DESCRIBE TABLE delta.`abfss://******@storage.dfs.core.windows.net/test/incremental_test/`;
Look for
delta.enableChangeDataFeed
in the output.To read changes without specifying a
startingVersion
orstartingTimestamp
, you should wrap your read command in a stream as follows:df_changes = spark.readStream.format("delta") \ .option("readChangeFeed", "true") \ .load("abfss://******@storage.dfs.core.windows.net/test/incremental_test/")
I have created a delta table from my side with dummy data
from pyspark.sql import SparkSession # Initialize Spark session spark = SparkSession.builder \ .appName("DeltaTableExample") \ .getOrCreate() # Create a DataFrame data = [("link", "zelda"), ("king k rool", "donkey kong"), ("samus", "metroid")] columns = ["character", "franchise"] df = spark.createDataFrame(data, columns) # Write the DataFrame to a Delta table"my_delta_table" df.write.format("delta").saveAsTable("my_delta_table")
I have enabled change data feed on that created table.and verified the status
spark.sql("ALTER TABLE my_delta_table SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
I have read the change data feed for the table
changes_df = spark.read.format("delta").option("readChangeData", "true") \ .option("startingVersion", 1).option("endingVersion", 5).table("my_delta_table") changes_df.show()
Hope this helps. Do let us know if you any further queries.
- Before enabling CDF, double-check that your Delta table doesn't have any columns that conflict with the required CDF metadata columns. If your table has columns named
-
SaiSekhar, MahasivaRavi (Philadelphia) • 140 Reputation points
2025-05-02T08:41:49.2433333+00:00 Hi @phemanth,
Thanks for the update,we are looking to change delta as file not table and can you please try to read the datachangefeed without passing starting version in the pyspark syntax and can you try to enable CDF on delta as a file, please share the code of it.
Thanks in Advance
Thank you!
-
SaiSekhar, MahasivaRavi (Philadelphia) • 140 Reputation points
2025-05-02T11:26:10.2433333+00:00 Hi @phemanth,
can you please provide an update on this.
Thank you! -
phemanth • 15,755 Reputation points • Microsoft External Staff • Moderator
2025-05-05T06:03:37.1366667+00:00 @SaiSekhar, MahasivaRavi (Philadelphia)
To enabling Change Data Feed (CDF) on a Delta file and reading the change data feed without specifying the starting version in PySpark.
Please check the below code dummy DataFrame, write it as a Delta file, enable CDF, and read the changes:
from pyspark.sql import SparkSession # Initialize Spark session spark = SparkSession.builder \ .appName("DeltaFileExample") \ .getOrCreate() # Create a DataFrame data = [("link", "zelda"), ("king k rool", "donkey kong"), ("samus", "metroid")] columns = ["character", "franchise"] df = spark.createDataFrame(data, columns) # Write the DataFrame to a Delta file df.write.format("delta").save("abfss://******@storage.dfs.core.windows.net/test/incremental_test/") # Enable Change Data Feed on the Delta file spark.sql("ALTER TABLE delta.`abfss://******@storage.dfs.core.windows.net/test/incremental_test/` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") # Read the Change Data Feed without specifying starting version df_changes = spark.read.format("delta") \ .option("readChangeData", "true") \ .load("abfss://******@storage.dfs.core.windows.net/test/incremental_test/") df_changes.show()
Troubleshooting Tips
- Ensure there are no conflicting column names (_change_type, _commit_version, _commit_timestamp) in your Delta file.
- Verify that the
ALTER TABLE
command executed successfully and the property is set correctly.
-
SaiSekhar, MahasivaRavi (Philadelphia) • 140 Reputation points
2025-05-05T11:10:34.1566667+00:00 Hi @phemanth,
Thanks for the update,After running the above script getting the below error ran the pyspark script provided by you, in delta file the columns are also not creating, currently I'm running in synapse notebook.
Can you please help me with the code and please send the result screenshots as well, please let us know if we need to install any python modules.
Thank you!
-
SaiSekhar, MahasivaRavi (Philadelphia) • 140 Reputation points
2025-05-06T05:24:32.77+00:00 Hi @phemanth,
can you please provide an update on this. Thank you!
-
Dileep Raj Narayan Thumula • 255 Reputation points • Microsoft External Staff • Moderator
2025-05-06T10:50:51.98+00:00 Hello
@SaiSekhar, MahasivaRavi (Philadelphia),In Delta Lake Change Data Feed (CDF), the special columns (_change_type, _commit_version, _commit_timestamp) donot exist in the base table.
They appear only when you query the table with CDF enabled. So, you wont see them if you do a normal
SELECT * FROM delta table.
Also in synapse Ensure you are using Delta Lake 2.0+
because CDF works starting from Delta 2.0.
I have tried the below:
from pyspark.sql import SparkSession data = [("link", "zelda"), ("king k rool", "donkey kong"), ("samus", "metroid")] columns = ["character", "franchise"] df = spark.createDataFrame(data, columns) delta_path = "abfss://******@allnewsynp.dfs.core.windows.net/f1/incremental_test/" df.write.format("delta").mode("overwrite").save(delta_path) spark.sql(f"DROP TABLE IF EXISTS incremental_test") spark.sql(f"CREATE TABLE incremental_test USING DELTA LOCATION '{delta_path}'") spark.sql("ALTER TABLE incremental_test SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") spark.sql("UPDATE incremental_test SET franchise = 'nintendo' WHERE character = 'link'")
In the above code
Registering it as table in the catalog (this is mandatory for CDF)
Enable Change Data Feed
Doing an update to create a new version (CDF only works when there are changes)
-
SaiSekhar, MahasivaRavi (Philadelphia) • 140 Reputation points
2025-05-06T11:41:48.0733333+00:00 Hi @Dileep Raj Narayan Thumula,
Thanks for the update,we are almost there, we are able to read the data using the below syntax but we are not able to display and write the data frame, can you please help me with the syntax and please share the result screenshots please.
please share the write syntax and result of it, Thank you.
Thank you for helping us.df_changes = spark.readStream.format("delta") \ .option("readChangeFeed", "true") \ .load("abfss://******@storage.dfs.core.windows.net/test/incremental_check/")
-
Dileep Raj Narayan Thumula • 255 Reputation points • Microsoft External Staff • Moderator
2025-05-06T12:33:15.4166667+00:00 df_changes = spark.readStream.format("delta") \
.option("readChangeData", "true") \
.table("incremental_test")
query = df_changes.writeStream \
.format("console") \
.option("checkpointLocation", "abfss://******@allnewsynp.dfs.core.windows.net/f1/ITC/") \
.start()
import time
time.sleep(60)
query.stop()
@
SaiSekhar, MahasivaRavi (Philadelphia), can you try the above and let me know if it helped you?will share the results at my end too.
-
SaiSekhar, MahasivaRavi (Philadelphia) • 140 Reputation points
2025-05-06T13:41:53.1766667+00:00 Hi @Dileep Raj Narayan Thumula ,
Thanks for the update,
The write command is working but where can we see the CDF data, we are seeing the below files in the folder.
Is it possible to display the streamdata frame and can you please share your inputs is it working as expected. we are able to write but not able to see the write data, which location it got saved,
Can you please share the output, Thank you for sharing this!
query = df_changes.writeStream \ .format("console") \ .option("checkpointLocation", "abfss://******@storage.dfs.core.windows.net/test/incremental_insert_new/") \ .start()
-
SaiSekhar, MahasivaRavi (Philadelphia) • 140 Reputation points
2025-05-06T16:17:34.3+00:00 Hi @Dileep Raj Narayan Thumula ,
can you please provide an update, Thank you -
Dileep Raj Narayan Thumula • 255 Reputation points • Microsoft External Staff • Moderator
2025-05-07T11:30:43.9533333+00:00 Hi @SaiSekhar, MahasivaRavi (Philadelphia)
query = df_changes.writeStream \ .format("console") \ .option("checkpointLocation", "abfss://******@storage.dfs.core.windows.net/test/incremental_insert_new/") \ .start()
This prints the CDF records directly in your Databricks or Synapse notebook output window this is how streaming dataframes are "displayed"
As you mentioned "Can we display the stream data frame?"
Yes, using
.format('console')
as we are doing, the records are displayed in the output cell logs of the notebook or job run.The below code does write and then view saved CDF records (in Delta or Parquet files)
query = df_changes.writeStream \ .format("delta") \ .outputMode("append") \ .option("checkpointLocation", "abfss://******@storage.dfs.core.windows.net/test/cdf_checkpoint/") \ .start("abfss://******@storage.dfs.core.windows.net/test/cdf_output/") import time time.sleep(60) query.stop() df_written = spark.read.format("delta").load("abfss://******@storage.dfs.core.windows.net/test/cdf_output/") df_written.show()
-
Dileep Raj Narayan Thumula • 255 Reputation points • Microsoft External Staff • Moderator
2025-05-08T10:02:46.2266667+00:00 Hi @SaiSekhar, MahasivaRavi (Philadelphia),
We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet. In case if you have any resolution please do share that same with the community as it can be helpful to others. Otherwise, will respond with more details and we will try to help.
-
Dileep Raj Narayan Thumula • 255 Reputation points • Microsoft External Staff • Moderator
2025-05-09T12:08:11.1266667+00:00 Hi @SaiSekhar, MahasivaRavi (Philadelphia),
We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet. In case if you have any resolution please do share that same with the community as it can be helpful to others. Otherwise, will respond with more details and we will try to help.
-
Shraddha Pore • 525 Reputation points • Microsoft External Staff • Moderator
2025-05-15T01:55:25.58+00:00 Hi @SaiSekhar, MahasivaRavi (Philadelphia),
We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet. In case if you have any resolution please do share that same with the community as it can be helpful to others. Otherwise, will respond with more details and we will try to help.
Sign in to comment