I have a pipeline which needs to run for every one hour and do some data cleansing and store the file in compressed parquet format in silver zone. File is saved using pyspark dataframe write method with append in y=2023/MM=10/dd=01/h=10/m=00/filename.parquet format.
Saving Code
# Save data in ADLS Gen2 in parquet
def SaveData(df_cleaned,path):
try:
print('Silver path - '+path)
df_cleaned.write.format(target_file_format).save(path).mode("append")
except Exception as e:
logger.error(str(e))
raise
Error Message
An error occurred while calling o4012.save.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:284)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:187)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:152)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:152)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:145)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:145)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:129)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:123)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:183)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:901)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:382)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:241)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: This operation is not permitted on a non-empty directory.
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.delete(AzureNativeFileSystemStore.java:2720)
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.delete(AzureNativeFileSystemStore.java:2730)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem.deleteFile(NativeAzureFileSystem.java:2670)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem$2.execute(NativeAzureFileSystem.java:2421)
at org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor.executeParallel(AzureFileSystemThreadPoolExecutor.java:223)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem.deleteWithoutAuth(NativeAzureFileSystem.java:2433)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem.delete(NativeAzureFileSystem.java:2483)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem.delete(NativeAzureFileSystem.java:1977)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.cleanupJob(FileOutputCommitter.java:532)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:416)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:219)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$25(FileFormatWriter.scala:273)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:642)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:273)
... 40 more
Caused by: com.microsoft.azure.storage.StorageException: This operation is not permitted on a non-empty directory.
at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:87)
at com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:315)
at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:185)
at com.microsoft.azure.storage.blob.CloudBlob.delete(CloudBlob.java:981)
at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.delete(StorageInterfaceImpl.java:314)
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.safeDelete(AzureNativeFileSystemStore.java:2672)
at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.delete(AzureNativeFileSystemStore.java:2711)