I have a SQL table which contains a list of files stored in ADLS Gen2 storage that I need to load into a dataframe.
The location of the files changes for each file as for date partition example of 5 files below:
I need to load these files to a dataframe with the filename appended then deduplicate and load the result into a sql table.
import pandas as pd
import pyodbc
import pyspark
server = 'server.sql.azuresynapse.net'
database = 'db'
username = 'sqladminuser'
password = 'pwd'
cnxn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
#Load Files to dataframe
df = pd.read_sql ('Select path from dbo.SourceFiles',cnxn)
df2 = pd.DataFrame(df)
#Iterate through files
for index, row in df2.iterrows():
data_path = spark.read.load(row["path"], format='parquet', header=True)
data_path.show(10)
print('Converting to Pandas.')
pdf = data_path.toPandas()
pdf = pdf.assign(Filename = row["path"])
pdf.drop_duplicates()
print(pdf)
Problem I have with my code is that the final pdf dataframe only returns the last file that was processed. Is it possible to use this method to append each iteration?