An Apache Spark-based analytics platform optimized for Azure.
@Sharukh Kundagol - Thanks for the question and using MS Q&A platform.
It seems that you are using Azure Databricks to load data from Azure Data Lake Storage Gen1 or Gen2 using the cloudFiles connector. The issue you are facing is that the batch ID is always 0, which means that the checkpoint is not working properly and all files are being loaded every time the job runs.
To solve this issue, you can try the following steps:
Make sure that the checkpoint path is valid and accessible by the user running the job. You can check the logs to see if there are any errors related to the checkpoint location.
Try changing the trigger option to "once" instead of "availableNow". This will ensure that the job runs only once and the checkpoint is properly updated.
If the above steps do not work, you can try resetting the checkpoint location by deleting the checkpoint directory and running the job again. This will force the job to start from scratch and create a new checkpoint.
Here is an updated version of your code with the suggested changes:
spark.readStream .format("cloudFiles") .option("cloudFiles.format", "csv") .option("cloudFiles.useIncrementalListing","true") .option("cloudFiles.schemaLocation", schema_path) .load(file_path) .select("*", current_timestamp().alias("processing_time")) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(once=True) .toTable(table_name))
For more details, refer to https://learn.microsoft.com/en-us/azure/databricks/structured-streaming/triggers
I hope this helps! Let me know if you have any further questions.