Getting below error when appending data in a parquet file stored in azure data lake gen2

Jayesh Yadav 0 Reputation points
2023-09-20T09:50:25.3766667+00:00
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit
import mysql.connector
from azure.storage.filedatalake import DataLakeServiceClient
import os


spark = SparkSession.builder \
    .appName("MySQL Incremental Load") \
    .config("spark.driver.extraClassPath", "D:/mysql-connector-java-8.0.22/mysql-connector-java-8.0.22.jar") \
    .getOrCreate()


jdbc_url = "jdbc:mysql://IP:PORT/Database"
properties = {
    "user": "root",
    "password": "pwd",
    "driver": "com.mysql.cj.jdbc.Driver"
}


last_processed_df = spark.read.csv("C:\\Users\\Intel\\Downloads\\Untitled Folder\\file.csv", header=True)
last_processed_id_row = last_processed_df.filter(last_processed_df["table"] == "record").collect()[0]
last_processed_id = last_processed_id_row["last_id"]


query = f"(SELECT * FROM record_backup WHERE Unique_identifier > {last_processed_id}) as tmp"
df_incremental = spark.read.jdbc(jdbc_url, query, properties=properties)


local_parquet_path = "temp_incremental_data.parquet"
df_incremental.write.mode("overwrite").parquet(local_parquet_path)



service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
    "https", "storage_name"), credential="==")
file_system_client = service_client.get_file_system_client(file_system="container")
directory_client = file_system_client.get_directory_client("blob folder")
file_client = directory_client.get_file_client("exisiting_file_name.parquet")

with open(local_parquet_path, 'rb') as local_file:
    file_client.append_data(local_file.read(), offset=0, length=None)
    file_client.flush_data(local_file.tell())


new_max_id = df_incremental.agg({"Unique_identifier": "max"}).collect()[0][0]
updated_df = last_processed_df.withColumn("last_id", 
                                          when(last_processed_df["table"] == "record", lit(new_max_id))
                                          .otherwise(last_processed_df["last_id"]))
updated_df.write.mode("overwrite").option("header", "true").csv("C:\\Users\\Intel\\Downloads\\Untitled Folder\\file.csv")


os.remove(local_parquet_path)


spark.stop()

Error :

Py4JJavaError: An error occurred while calling o40.parquet.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:792)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Azure Data Lake Storage
Azure Data Lake Storage
An Azure service that provides an enterprise-wide hyper-scale repository for big data analytic workloads and is integrated with Azure Blob Storage.
1,559 questions
{count} votes

1 answer

Sort by: Most helpful
  1. Bhargava-MSFT 31,261 Reputation points Microsoft Employee Moderator
    2023-09-20T22:14:31.3766667+00:00

    Hello Jayesh Yadav ,

    Welcome to the Microsoft Q&A forum.

    A similar error message has been discussed in the threads below.

    https://stackoverflow.com/questions/41851066/exception-in-thread-main-java-lang-unsatisfiedlinkerror-org-apache-hadoop-io

    https://stackoverflow.com/questions/18630019/running-apache-hadoop-2-1-0-on-windows

    Per the error message, there may be an issue with the Hadoop libraries or a configuration issue.

    Please check if your Hadoop configuration is set up correctly in your environment. This includes setting the HADOOP_HOME environment variable and adding the Hadoop binaries to the PATH

    and making sure that the Hadoop libraries are installed and configured correctly in the environment where the Java application is running.

    Also, check the Spark configuration is set up correctly in your environment. This may include setting the SPARK_HOME environment variable and adding the Spark binaries to the PATH

    finally, the user running the Spark job has the necessary permissions to access the Parquet file and write to it.

    I hope this helps.

    Please let me know if you have any further questions.


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.