Data Transformation approach for json schema using pyspark

Birajdar, Sujata 61 Reputation points
2021-11-11T07:42:12.833+00:00

Hi Team,

We have requirement where we need to read large complex structure json (nearly 50 million records) and need to convert it to another brand new nested complex json (Note : Entire schema is different between i/p and o/p json files like levels, column names e.t.c)

We are following below approach using PySpark but we need suggestions from experts like you as we started this approach with Databricks for the first time

  1. We are reading input json files from ADLS to Databricks
  2. And Flattening the entire nested complex Dataframe to a single level json df
  3. Now we are transforming the flatten dataframe to o/p schema level dataframe with required schema using struct and array field types(Note : As our i/p and o/p schemas are completely different, we are doing it manually. Ex : In our o/p file we have around 250 attributes upto 5 level. As we didn't find any approach to pass o/p schema while writing dataframe to json we are applying schema on top of flatten dataframe before writing it to json and we need to rename the attribute names as per o/p schema.)

For ex we are using below code to flattening the nested json

def flatten(df):

compute Complex Fields (Lists and Structs) in Schema

complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields)!=0:
col_name=list(complex_fields.keys())[0]
print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))

if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]]
df=df.select("*", *expanded).drop(col_name)

if ArrayType then add the Array Elements as Rows using the explode function

i.e. explode Arrays

elif (type(complex_fields[col_name]) == ArrayType):
df=df.withColumn(col_name,explode_outer(col_name))

recompute remaining Complex Fields in Schema

complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return df

For schema conversion

df1=df.withColumn("jsn",struct("id","name","type",struct("image_height","image_url",struct("image_width").alias("im_w"),array(struct(df.image_rating_avg,df.image_rating_good)).alias("rating")).alias("image"),struct("thumbnail_height","thumbnail_url","thumbnail_width").alias("thumbnail"))).drop(cols).select("jsn.")

Please add your suggestions here and suggest us on approach

Thank you

Azure Data Lake Storage
Azure Data Lake Storage
An Azure service that provides an enterprise-wide hyper-scale repository for big data analytic workloads and is integrated with Azure Blob Storage.
1,539 questions
Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
2,327 questions
{count} votes

1 answer

Sort by: Most helpful
  1. HimanshuSinha-msft 19,476 Reputation points Microsoft Employee
    2021-11-15T20:32:39.107+00:00

    Hello @Birajdar, Sujata ,
    Thanks for the using the Q&A and also for sharing the code . I think in theory the idea looks fine . If you can share the input json and the output json , that could have been more helpful .

    One other point can you please elaborate more on
    "Now we are transforming the flatten dataframe to o/p schema level dataframe with required schema using struct and array field types(Note : As our i/p and o/p schemas are completely different"

    Thanks
    Himanshu

    -------------------------------------------------------------------------------------------------------------------------

    • Please don't forget to click on 130616-image.png or upvote 130671-image.png button whenever the information provided helps you. Original posters help the community find answers faster by identifying the correct answer. Here is how
    • Want a reminder to come back and check responses? Here is how to subscribe to a notification
    • If you are interested in joining the VM program and help shape the future of Q&A: Here is how you can be part of Q&A Volunteer Moderators
    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.