Hi everyone,
I am doing some ETL operations on very large datasets with millions or even billions of records. One of my pain points is profiling the data for Nulls, Duplicates, Unique and Junk. I am reading the data from csv using spark.read.csv
and doing the operations on the dataframe. The results are written into a Postgres db table.
My concern is the time it takes (takes hours..) to profile the entire dataset as I want it separate for each column. I am sharing the code I am using right now to do this. I am new to all this so please excuse the code quality.
Any help to get faster results are appreciated.
def data_profiling(self, cursor, df, attribute_ids, existing_attribute_ids):
total_rows = df.count()
null_count = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0]
unique_count = df.agg(*(countDistinct(col).alias(col) for col in df.columns)).collect()[0]
duplicate_count = df.groupBy(df.columns).count().filter('count > 1').count()
junk_count = df.select([count(when(~col(c).rlike("^[A-Za-z0-9]"), c)).alias(c) for c in df.columns]).collect()[0]
# Calculate percentages
null_percentage = {col: (null_count[col] / total_rows) * 100 for col in df.columns}
unique_percentage = {col: (unique_count[col] / total_rows) * 100 for col in df.columns}
duplicate_percentage = (duplicate_count / total_rows) * 100 if duplicate_count > 0 else 0
junk_percentage = {col: (junk_count[col] / total_rows) * 100 for col in df.columns}
try:
for column in df.columns:
if column == "record_id":
continue
attribute_id = attribute_ids.get(column)
if attribute_id is not None and attribute_id not in existing_attribute_ids:
creation_time = datetime.datetime.now()
null_payload = {"count": null_count[column], "percentage": null_percentage[column]}
unique_payload = {"count": unique_count[column], "percentage": unique_percentage[column]}
duplicate_payload = {"count": duplicate_count, "percentage": duplicate_percentage}
junk_payload = {"count": junk_count[column], "percentage": junk_percentage[column]}
profile_query = "INSERT INTO profiles (profile_type, payload, updated_at, created_at, attribute_id) VALUES (%s, %s, %s, %s, %s)"
cursor.execute(profile_query, ("null", json.dumps(null_payload), creation_time, creation_time, attribute_id))
cursor.execute(profile_query, ("unique", json.dumps(unique_payload), creation_time, creation_time, attribute_id))
cursor.execute(profile_query, ("duplicate", json.dumps(duplicate_payload), creation_time, creation_time, attribute_id))
cursor.execute(profile_query, ("junk", json.dumps(junk_payload), creation_time, creation_time, attribute_id))
conn.commit()