Data profiling for a Spark Dataframe

Varun S Kumar 50 Reputation points

Hi everyone,

I am doing some ETL operations on very large datasets with millions or even billions of records. One of my pain points is profiling the data for Nulls, Duplicates, Unique and Junk. I am reading the data from csv using and doing the operations on the dataframe. The results are written into a Postgres db table.

My concern is the time it takes (takes hours..) to profile the entire dataset as I want it separate for each column. I am sharing the code I am using right now to do this. I am new to all this so please excuse the code quality.

Any help to get faster results are appreciated.

def data_profiling(self, cursor, df, attribute_ids, existing_attribute_ids):
	total_rows = df.count()

    null_count =[count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0]
    unique_count = df.agg(*(countDistinct(col).alias(col) for col in df.columns)).collect()[0]
    duplicate_count = df.groupBy(df.columns).count().filter('count > 1').count()
    junk_count =[count(when(~col(c).rlike("^[A-Za-z0-9]"), c)).alias(c) for c in df.columns]).collect()[0]

        # Calculate percentages
     null_percentage = {col: (null_count[col] / total_rows) * 100 for col in df.columns}
     unique_percentage = {col: (unique_count[col] / total_rows) * 100 for col in df.columns}
     duplicate_percentage = (duplicate_count / total_rows) * 100 if duplicate_count > 0 else 0
     junk_percentage = {col: (junk_count[col] / total_rows) * 100 for col in df.columns}

      	for column in df.columns:        
        if column == "record_id":
        attribute_id = attribute_ids.get(column)

        if attribute_id is not None and attribute_id not in existing_attribute_ids:
      		creation_time =

            null_payload = {"count": null_count[column], "percentage": null_percentage[column]}
            unique_payload = {"count": unique_count[column], "percentage": unique_percentage[column]}
            duplicate_payload = {"count": duplicate_count, "percentage": duplicate_percentage}
            junk_payload = {"count": junk_count[column], "percentage": junk_percentage[column]}

            profile_query = "INSERT INTO profiles (profile_type, payload, updated_at, created_at, attribute_id) VALUES (%s, %s, %s, %s, %s)"

            cursor.execute(profile_query, ("null", json.dumps(null_payload), creation_time, creation_time, attribute_id))
            cursor.execute(profile_query, ("unique", json.dumps(unique_payload), creation_time, creation_time, attribute_id))
            cursor.execute(profile_query, ("duplicate", json.dumps(duplicate_payload), creation_time, creation_time, attribute_id))
            cursor.execute(profile_query, ("junk", json.dumps(junk_payload), creation_time, creation_time, attribute_id))

Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
1,910 questions
{count} vote

Accepted answer
  1. BhargavaGunnam-MSFT 25,881 Reputation points Microsoft Employee

    <Since the Microsoft Q&A community has a policy that "The question author cannot accept their own answer. They can only accept answers by others ", I'll repost your solution in case you'd like to "Accept " the answer>


    • Performance tune dataprofiling on the dataframe(query is in the question)


    • converting the CSVs to Parquet has shown significant improvement in performance. Parquet is a columnar storage format that is optimized for analytics workloads, and it can provide significant performance benefits over row-based formats like CSV.

    If I missed anything please let me know and I'd be happy to add it to my answer, or feel free to comment below with any additional information.

    Please remember to "Accept Answer" if any answer/reply helped, so that others in the community facing similar issues can easily find the solution.

    0 comments No comments

0 additional answers

Sort by: Most helpful