from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit
import mysql.connector
from azure.storage.filedatalake import DataLakeServiceClient
import os
spark = SparkSession.builder \
.appName("MySQL Incremental Load") \
.config("spark.driver.extraClassPath", "D:/mysql-connector-java-8.0.22/mysql-connector-java-8.0.22.jar") \
.getOrCreate()
jdbc_url = "jdbc:mysql://IP:PORT/Database"
properties = {
"user": "root",
"password": "pwd",
"driver": "com.mysql.cj.jdbc.Driver"
}
last_processed_df = spark.read.csv("C:\\Users\\Intel\\Downloads\\Untitled Folder\\file.csv", header=True)
last_processed_id_row = last_processed_df.filter(last_processed_df["table"] == "record").collect()[0]
last_processed_id = last_processed_id_row["last_id"]
query = f"(SELECT * FROM record_backup WHERE Unique_identifier > {last_processed_id}) as tmp"
df_incremental = spark.read.jdbc(jdbc_url, query, properties=properties)
local_parquet_path = "temp_incremental_data.parquet"
df_incremental.write.mode("overwrite").parquet(local_parquet_path)
service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
"https", "storage_name"), credential="==")
file_system_client = service_client.get_file_system_client(file_system="container")
directory_client = file_system_client.get_directory_client("blob folder")
file_client = directory_client.get_file_client("exisiting_file_name.parquet")
with open(local_parquet_path, 'rb') as local_file:
file_client.append_data(local_file.read(), offset=0, length=None)
file_client.flush_data(local_file.tell())
new_max_id = df_incremental.agg({"Unique_identifier": "max"}).collect()[0][0]
updated_df = last_processed_df.withColumn("last_id",
when(last_processed_df["table"] == "record", lit(new_max_id))
.otherwise(last_processed_df["last_id"]))
updated_df.write.mode("overwrite").option("header", "true").csv("C:\\Users\\Intel\\Downloads\\Untitled Folder\\file.csv")
os.remove(local_parquet_path)
spark.stop()
Error :
Py4JJavaError: An error occurred while calling o40.parquet.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:792)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)