MongoDB database copy to Azure Databricks

Rajeev Singh 21 Reputation points
2023-05-04T08:50:29.52+00:00

Hi,

I have setup MongoDB to Azure Databricks data tranfer. I am able to retrive all table name from database but unable to get data from those tables.

I am receiving error message as "Data used in creating the Delta table doesn't have any columns.". Please help to resolve this issue.

Regards,

Rajeev

Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
1,917 questions
{count} votes

1 answer

Sort by: Most helpful
  1. BhargavaGunnam-MSFT 26,136 Reputation points Microsoft Employee
    2023-05-11T19:20:38.1166667+00:00

    Hello Rajeev Singh,

    Sorry for the delayed response.

    It seems like the DataFrame df is empty or has no columns, causing the error when creating a Delta table. To resolve this issue, you can add a condition to check if the DataFrame has columns before proceeding with the write_delta_v2 function

    The below code will skip the tables with no columns.
    please try and let me know.

    from pyspark.sql import SparkSession
    import pandas as pd
    
    for table_name in collectionNames:
        # Create a SparkSession
        spark = SparkSession.builder.master('local').appName("Get all columns").config('org.mongodb.spark:mongo-spark-connector_2.12:3.0.1').getOrCreate()
    
        df = spark.read.format("mongo").option("database", "xxxxxxx").option("spark.mongodb.input.uri", jdbc_hostname).option("collection","yyyyyyyy").load()
        df.show()
        print(f"Data count for the table {table_name} from {prev_time} to {current_time} is: {df.count()}")
    
        tgt_db_name = tgt_db_name
        tgt_table_name = table_name
    
        # Check if the DataFrame has columns
        if len(df.columns) > 0:
            if action == 'upsert':
                write_delta_v2(type = 'data_copy', df=df, mode=action, dbname= tgt_db_name, targetTable=tgt_table_name, sourceTable=tgt_table_name, keyCol='id')
            else:
                write_delta_v2(type = 'data_copy', df=df, mode=action, dbname= tgt_db_name, targetTable=tgt_table_name)
        else:
            print(f"Skipping table {table_name} as it has no columns.")