Getting exception when trying to dave a data frame to azure datalake using pyspark

DataEngineer 40 Reputation points
2023-10-24T12:37:45.82+00:00

I have a pipeline which needs to run for every one hour and do some data cleansing and store the file in compressed parquet format in silver zone. File is saved using pyspark dataframe write method with append in y=2023/MM=10/dd=01/h=10/m=00/filename.parquet format.

Saving Code

# Save data in ADLS Gen2 in parquet
def SaveData(df_cleaned,path):
    try:
       print('Silver path - '+path) 
       df_cleaned.write.format(target_file_format).save(path).mode("append")
    except Exception as e:
       logger.error(str(e))
       raise

Error Message

An error occurred while calling o4012.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:284)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:187)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:152)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:152)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:145)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:145)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:129)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:123)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:183)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:901)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:382)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:241)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: This operation is not permitted on a non-empty directory.
	at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.delete(AzureNativeFileSystemStore.java:2720)
	at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.delete(AzureNativeFileSystemStore.java:2730)
	at org.apache.hadoop.fs.azure.NativeAzureFileSystem.deleteFile(NativeAzureFileSystem.java:2670)
	at org.apache.hadoop.fs.azure.NativeAzureFileSystem$2.execute(NativeAzureFileSystem.java:2421)
	at org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor.executeParallel(AzureFileSystemThreadPoolExecutor.java:223)
	at org.apache.hadoop.fs.azure.NativeAzureFileSystem.deleteWithoutAuth(NativeAzureFileSystem.java:2433)
	at org.apache.hadoop.fs.azure.NativeAzureFileSystem.delete(NativeAzureFileSystem.java:2483)
	at org.apache.hadoop.fs.azure.NativeAzureFileSystem.delete(NativeAzureFileSystem.java:1977)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.cleanupJob(FileOutputCommitter.java:532)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:416)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:219)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$25(FileFormatWriter.scala:273)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:642)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:273)
	... 40 more
Caused by: com.microsoft.azure.storage.StorageException: This operation is not permitted on a non-empty directory.
	at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:87)
	at com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:315)
	at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:185)
	at com.microsoft.azure.storage.blob.CloudBlob.delete(CloudBlob.java:981)
	at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.delete(StorageInterfaceImpl.java:314)
	at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.safeDelete(AzureNativeFileSystemStore.java:2672)
	at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.delete(AzureNativeFileSystemStore.java:2711)
Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
5,375 questions
{count} votes

1 answer

Sort by: Most helpful
  1. Bhargava-MSFT 31,261 Reputation points Microsoft Employee Moderator
    2023-10-24T18:43:24.5433333+00:00

    Hello DataEngineer,

    The error message "caused by: org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: This operation is not permitted on a non-empty directory."

    is related to user error.

    This error happens when the configuration type is blob storage but the storage account is of type Gen2 and the Gen2 account has hierarchical namespace enabled.

    To resolve this, please use the correct storage type - storage account.

    I hope this helps.


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.