Indexing a Pyspark dataframe
Hey guys,
I am having a very large dataset as multiple parquets (like around 20,000 small files) which I am reading into a pyspark dataframe. I want to add an index column in this dataframe and then do some data profiling and data quality check activities. I'm sharing a portion of the code.
I've tried both monotonically_increasing_id as well as zipWithIndex. I've seen in every forums that zipWithIndex is best for performance but for me it's the other way around. Following is my benchmarks for indexing the table using both:
Parquet Size: 1.3 GB (around 15 GB if its in CSV format)
Total Row Count: 1466764
Total Column Count: 900
Time taken for mono_id: 0.10401535034179688 seconds
mono_df.rdd.getNumPartitions = 1
Time taken for zip_id: 250.9147379398346 seconds
zip_df.rdd.getNumPartitions = 350
You can see that monotonically_increasing_id was done in split seconds while zipWithIndex took more than 4 minutes. But the number of partitions after monotonically_increasing_id came down to just one. The original dataframe while reading had 350 which was maintained after zipWithIndex.
Now after indexing, while running my profiling code, the mono_df takes an average of 15 seconds to profile a column while zip_df takes an average of 30 minutes.
I'm a newbie on pyspark and databricks. What am I doing wrong here and how can I increase the performance?
spark = SparkSession.builder.appName("Example").getOrCreate()
partition_size = spark.conf.get("spark.sql.files.maxPartitionBytes").replace("b","")
print(f"Partition Size: {int(partition_size) / 1024 / 1024} MB")
df_no_schema = spark.read.parquet('dbfs:parquet_folder/')
print(f"Number of Partition: {df_no_schema.rdd.getNumPartitions()}")
print(df_no_schema.count())
columns = df_no_schema.columns
row_with_index = Row(*columns, "index")
def create_new_schema(df_no_schema):
new_schema = StructType(df_no_schema.schema.fields[:] + [StructField("index", LongType(), False)])
return new_schema
def zip_rdd(df_no_schema, new_schema):
zipped_rdd = df_no_schema.rdd.zipWithIndex()
df = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
return df
def mono_id(df_no_schema):
window_spec = Window().orderBy(F.monotonically_increasing_id())
df = df_no_schema.withColumn("index", F.row_number().over(window_spec))
return df
new_schema = create_new_schema(df_no_schema)
mono_df = mono_id(df_no_schema)
print(f"Number of Partition: {mono_df.rdd.getNumPartitions()}")
zip_df = zip_rdd(df_no_schema, new_schema)
print(f"Number of Partition: {zip_df.rdd.getNumPartitions()}")
And this is the profiling code that follows:
for column in df.columns:
start_time = time.time()
unique_count = df.select(column).distinct().count()
unique_percentage = (unique_count / total_rows) * 100
duplicate_count = total_rows - unique_count
duplicate_percentage = (duplicate_count / total_rows) * 100 if duplicate_count > 0 else 0
null_count = df.filter(df[column].isNull()).count()
null_percentage = (null_count / total_rows) * 100
quality = int(unique_percentage / 10)
quality_string = f"{quality}/10"
unique_payload = {"count": unique_count, "percentage": unique_percentage}
duplicate_payload = {"count": duplicate_count, "percentage": duplicate_percentage}
quality_payload = {"count": "", "percentage": quality_string}
null_payload = {"count": null_count, "percentage": null_percentage}