Hi Team,
We have requirement where we need to read large complex structure json (nearly 50 million records) and need to convert it to another brand new nested complex json (Note : Entire schema is different between i/p and o/p json files like levels, column names e.t.c)
We are following below approach using PySpark but we need suggestions from experts like you as we started this approach with Databricks for the first time
- We are reading input json files from ADLS to Databricks
- And Flattening the entire nested complex Dataframe to a single level json df
- Now we are transforming the flatten dataframe to o/p schema level dataframe with required schema using struct and array field types(Note : As our i/p and o/p schemas are completely different, we are doing it manually. Ex : In our o/p file we have around 250 attributes upto 5 level. As we didn't find any approach to pass o/p schema while writing dataframe to json we are applying schema on top of flatten dataframe before writing it to json and we need to rename the attribute names as per o/p schema.)
For ex we are using below code to flattening the nested json
def flatten(df):
compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields)!=0:
col_name=list(complex_fields.keys())[0]
print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]]
df=df.select("*", *expanded).drop(col_name)
if ArrayType then add the Array Elements as Rows using the explode function
i.e. explode Arrays
elif (type(complex_fields[col_name]) == ArrayType):
df=df.withColumn(col_name,explode_outer(col_name))
recompute remaining Complex Fields in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return df
For schema conversion
df1=df.withColumn("jsn",struct("id","name","type",struct("image_height","image_url",struct("image_width").alias("im_w"),array(struct(df.image_rating_avg,df.image_rating_good)).alias("rating")).alias("image"),struct("thumbnail_height","thumbnail_url","thumbnail_width").alias("thumbnail"))).drop(cols).select("jsn.")
Please add your suggestions here and suggest us on approach
Thank you