Share via

Autoloader starting with batchid 0 for eavey batch run

Sharukh Kundagol 145 Reputation points
2023-09-26T14:56:35.5433333+00:00

Hi Team,

I have below code, which is scheduled to run one time a day, each time this is running it is creating batchid as 0, my mean checkpoint is not working properly and rather than loading incremental file it is loading all the files from input directory for each run.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("cloudFiles.useIncrementalListing","true")
  .option("cloudFiles.schemaLocation", schema_path)
  .load(file_path)
  .select("*",  current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))
Azure Databricks
Azure Databricks

An Apache Spark-based analytics platform optimized for Azure.


1 answer

Sort by: Most helpful
  1. PRADEEPCHEEKATLA 91,866 Reputation points
    2023-09-27T12:43:11.68+00:00

    @Sharukh Kundagol - Thanks for the question and using MS Q&A platform.

    It seems that you are using Azure Databricks to load data from Azure Data Lake Storage Gen1 or Gen2 using the cloudFiles connector. The issue you are facing is that the batch ID is always 0, which means that the checkpoint is not working properly and all files are being loaded every time the job runs.

    To solve this issue, you can try the following steps:

    Make sure that the checkpoint path is valid and accessible by the user running the job. You can check the logs to see if there are any errors related to the checkpoint location.

    Try changing the trigger option to "once" instead of "availableNow". This will ensure that the job runs only once and the checkpoint is properly updated.

    If the above steps do not work, you can try resetting the checkpoint location by deleting the checkpoint directory and running the job again. This will force the job to start from scratch and create a new checkpoint.

    Here is an updated version of your code with the suggested changes:

    spark.readStream .format("cloudFiles") .option("cloudFiles.format", "csv") .option("cloudFiles.useIncrementalListing","true") .option("cloudFiles.schemaLocation", schema_path) .load(file_path) .select("*", current_timestamp().alias("processing_time")) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(once=True) .toTable(table_name))
    

    For more details, refer to https://learn.microsoft.com/en-us/azure/databricks/structured-streaming/triggers

    I hope this helps! Let me know if you have any further questions.

    Was this answer helpful?

    0 comments No comments

Your answer

Answers can be marked as 'Accepted' by the question author and 'Recommended' by moderators, which helps users know the answer solved the author's problem.