Indexing a Pyspark dataframe

Varun S Kumar 50 Reputation points
2024-05-09T07:29:38.0266667+00:00

Hey guys,

I am having a very large dataset as multiple parquets (like around 20,000 small files) which I am reading into a pyspark dataframe. I want to add an index column in this dataframe and then do some data profiling and data quality check activities. I'm sharing a portion of the code.
I've tried both monotonically_increasing_id as well as zipWithIndex. I've seen in every forums that zipWithIndex is best for performance but for me it's the other way around. Following is my benchmarks for indexing the table using both:

Parquet Size: 1.3 GB (around 15 GB if its in CSV format)
Total Row Count: 1466764
Total Column Count: 900

Time taken for mono_id: 0.10401535034179688 seconds
mono_df.rdd.getNumPartitions = 1

Time taken for zip_id: 250.9147379398346 seconds
zip_df.rdd.getNumPartitions = 350

You can see that monotonically_increasing_id was done in split seconds while zipWithIndex took more than 4 minutes. But the number of partitions after monotonically_increasing_id came down to just one. The original dataframe while reading had 350 which was maintained after zipWithIndex.

Now after indexing, while running my profiling code, the mono_df takes an average of 15 seconds to profile a column while zip_df takes an average of 30 minutes.

I'm a newbie on pyspark and databricks. What am I doing wrong here and how can I increase the performance?

spark = SparkSession.builder.appName("Example").getOrCreate()
partition_size = spark.conf.get("spark.sql.files.maxPartitionBytes").replace("b","")
print(f"Partition Size: {int(partition_size) / 1024 / 1024} MB")

df_no_schema = spark.read.parquet('dbfs:parquet_folder/')
print(f"Number of Partition: {df_no_schema.rdd.getNumPartitions()}")
print(df_no_schema.count())

columns = df_no_schema.columns
row_with_index = Row(*columns, "index")

def create_new_schema(df_no_schema):
    new_schema = StructType(df_no_schema.schema.fields[:] + [StructField("index", LongType(), False)])
    return new_schema

def zip_rdd(df_no_schema, new_schema):
    zipped_rdd = df_no_schema.rdd.zipWithIndex()
    df = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
    return df

def mono_id(df_no_schema):
    window_spec = Window().orderBy(F.monotonically_increasing_id())
    df = df_no_schema.withColumn("index", F.row_number().over(window_spec))
    return df

new_schema = create_new_schema(df_no_schema)
mono_df = mono_id(df_no_schema)
print(f"Number of Partition: {mono_df.rdd.getNumPartitions()}")
zip_df = zip_rdd(df_no_schema, new_schema)
print(f"Number of Partition: {zip_df.rdd.getNumPartitions()}")

And this is the profiling code that follows:

for column in df.columns:
    start_time = time.time()    
    unique_count = df.select(column).distinct().count()
    unique_percentage = (unique_count / total_rows) * 100
    duplicate_count = total_rows - unique_count
    duplicate_percentage = (duplicate_count / total_rows) * 100 if duplicate_count > 0 else 0
    null_count = df.filter(df[column].isNull()).count()
    null_percentage = (null_count / total_rows) * 100
    quality = int(unique_percentage / 10)
    quality_string = f"{quality}/10"

    unique_payload = {"count": unique_count, "percentage": unique_percentage}
    duplicate_payload = {"count": duplicate_count, "percentage": duplicate_percentage}
    quality_payload = {"count": "", "percentage": quality_string}
    null_payload = {"count": null_count, "percentage": null_percentage}
Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
1,955 questions
{count} votes