Hello Rajeev Singh,
Sorry for the delayed response.
It seems like the DataFrame df
is empty or has no columns, causing the error when creating a Delta table. To resolve this issue, you can add a condition to check if the DataFrame has columns before proceeding with the write_delta_v2
function
The below code will skip the tables with no columns.
please try and let me know.
from pyspark.sql import SparkSession
import pandas as pd
for table_name in collectionNames:
# Create a SparkSession
spark = SparkSession.builder.master('local').appName("Get all columns").config('org.mongodb.spark:mongo-spark-connector_2.12:3.0.1').getOrCreate()
df = spark.read.format("mongo").option("database", "xxxxxxx").option("spark.mongodb.input.uri", jdbc_hostname).option("collection","yyyyyyyy").load()
df.show()
print(f"Data count for the table {table_name} from {prev_time} to {current_time} is: {df.count()}")
tgt_db_name = tgt_db_name
tgt_table_name = table_name
# Check if the DataFrame has columns
if len(df.columns) > 0:
if action == 'upsert':
write_delta_v2(type = 'data_copy', df=df, mode=action, dbname= tgt_db_name, targetTable=tgt_table_name, sourceTable=tgt_table_name, keyCol='id')
else:
write_delta_v2(type = 'data_copy', df=df, mode=action, dbname= tgt_db_name, targetTable=tgt_table_name)
else:
print(f"Skipping table {table_name} as it has no columns.")