Error CONTEXT_ONLY_VALID_ON_DRIVER] I DATABRICKS

Shambhu Rai 1,411 Reputation points
2024-01-15T17:30:23.7233333+00:00

Hi Expert, am using below udf function in databricks merge condition but getting error 'pyspark.errors.exceptions.base.PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.', from <command-2591425064644209>, line 10. Full traceback below:

function attached in notepad

s   
                      
  """)



Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
2,514 questions
{count} votes

1 answer

Sort by: Most helpful
  1. PRADEEPCHEEKATLA 90,641 Reputation points Moderator
    2024-01-16T01:56:30.1333333+00:00

    Shambhu Rai - Thanks for the question and using MS Q&A platform.

    The error message indicates that you are trying to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers.

    In your code, it seems that you are passing spark object to the ufnGetGlobalSiteIdForTxns function which is causing the error. You should not pass spark object to the function. Instead, you can use the spark object inside the function to create a SparkSession object and use it to perform the required operations.

    Here's an updated version of your code:

    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    
    def get_global_site_id(pickup_company, supply_country, glops_los_no_x, external_los_no):
        spark_session = SparkSession.builder.getOrCreate()
        result = spark_session.sql("SELECT ufnGetGlobalSiteIdForTxns('{}', '{}', 'CDX', '{}', '{}')".format(pickup_company, supply_country, glops_los_no_x, external_los_no)).collect()[0][0]
        return result
    
    udf_get_global_site_id = udf(get_global_site_id, StringType())
    
    spark.sql("""
    MERGE into ErrorDeleted AS target                      
        USING (select *, udf_get_global_site_id(CHF_PICKUP_COMPANY, CHF_SUPPLY_COUNTRY, CHF_GLOPS_LOS_NO_X, CHF_EXTERNAL_LOS_NO) as GlobalSiteid from source_query) AS source                      
        ON (target.txnguid=source.txnguid)                      
        WHEN MATCHED THEN                      
        UPDATE SET                       
            GlobalSiteid=source.GlobalSiteid
    """)
    

    In this updated code, we have created a new function get_global_site_id which takes the required parameters and uses the SparkSession object to execute the SQL query. We have also created a UDF udf_get_global_site_id which uses the get_global_site_id function. Finally, we have used the UDF in the MERGE statement to update the GlobalSiteid column.

    Hope this helps. Do let us know if you any further queries.


    If this answers your query, do click Accept Answer and Yes for was this answer helpful. And, if you have any further query do let us know.

    1 person found this answer helpful.

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.