Shambhu Rai - Thanks for the question and using MS Q&A platform.
The error message indicates that you are trying to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers.
In your code, it seems that you are passing spark
object to the ufnGetGlobalSiteIdForTxns
function which is causing the error. You should not pass spark
object to the function. Instead, you can use the spark
object inside the function to create a SparkSession object and use it to perform the required operations.
Here's an updated version of your code:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def get_global_site_id(pickup_company, supply_country, glops_los_no_x, external_los_no):
spark_session = SparkSession.builder.getOrCreate()
result = spark_session.sql("SELECT ufnGetGlobalSiteIdForTxns('{}', '{}', 'CDX', '{}', '{}')".format(pickup_company, supply_country, glops_los_no_x, external_los_no)).collect()[0][0]
return result
udf_get_global_site_id = udf(get_global_site_id, StringType())
spark.sql("""
MERGE into ErrorDeleted AS target
USING (select *, udf_get_global_site_id(CHF_PICKUP_COMPANY, CHF_SUPPLY_COUNTRY, CHF_GLOPS_LOS_NO_X, CHF_EXTERNAL_LOS_NO) as GlobalSiteid from source_query) AS source
ON (target.txnguid=source.txnguid)
WHEN MATCHED THEN
UPDATE SET
GlobalSiteid=source.GlobalSiteid
""")
In this updated code, we have created a new function get_global_site_id
which takes the required parameters and uses the SparkSession
object to execute the SQL query. We have also created a UDF udf_get_global_site_id
which uses the get_global_site_id
function. Finally, we have used the UDF in the MERGE
statement to update the GlobalSiteid
column.
Hope this helps. Do let us know if you any further queries.
If this answers your query, do click Accept Answer
and Yes
for was this answer helpful. And, if you have any further query do let us know.