convert spark sql function in delta sql

Shambhu Rai 1,411 Reputation points
2023-12-10T09:19:21.34+00:00

Hi expert,

Hi Expert,

how to convert below spark sql function to delta sql, any example appreciated

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("example").getOrCreate()

# Define a UDF (User Defined Function) using Spark SQL
@udf
def ufnGetGlobalSiteIdForTxns(PickupCompany, SupplyCountry, TxnSource, SiteId, ExternalSiteId):
    GlobalSiteid = 0

    if TxnSource == 'CDX':
        if (PickupCompany == 'BP' and SupplyCountry in (3, 4, 6, 7, 8, 10, 11, 14, 15, 29, 50, 53)) or \
                (PickupCompany in ('AR', 'AS', 'CT', 'ES', 'GU', 'GV', 'JU', 'OT', 'TX', 'MP')):
            # Your CDX logic here
            pass
        elif (PickupCompany == 'MP' and SupplyCountry in (2, 12, 19)) or \
                (PickupCompany in ('AP', 'OM', 'RL', 'ST', 'WE', 'TR')):
            # Your second CDX logic here
            pass
        elif (PickupCompany == 'BP' and SupplyCountry == 5):
            # Your third CDX logic here
            pass
        elif PickupCompany == 'TO':
            # Your fourth CDX logic here
            pass
        elif PickupCompany in ('AV', 'DK', 'Q8'):
            # Your fifth CDX logic here
            pass

    elif TxnSource == 'OAC':
        if PickupCompany == 'TO':
            # Your OAC TO logic here
            pass
        else:
            # Your OAC logic here
            pass

    elif TxnSource == 'HEROES':
        # Your HEROES logic here
        pass

    return GlobalSiteid

# Assuming you have a DataFrame named df
#df = df.withColumn("GlobalSiteid", ufnGetGlobalSiteIdForTxns("PickupCompany", "SupplyCountry", "TxnSource", "SiteId", "ExternalSiteId"))
Azure Data Lake Storage
Azure Data Lake Storage
An Azure service that provides an enterprise-wide hyper-scale repository for big data analytic workloads and is integrated with Azure Blob Storage.
Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
{count} votes

1 answer

Sort by: Most helpful
  1. Dillon Silzer 60,701 Reputation points Volunteer Moderator
    2023-12-11T00:58:38.6333333+00:00

    Hi Shambhu,

    If I am understanding this correctly, you want to use the following and convert it to a dataframe so you can write the result to a table.

    %sql
    MERGE into Transactions_Crdx AS TARGET
     USING (SELECT H100.*,     
      GLOBAL_SITE_ID = ufnGetGlobslSiteIdForTxns("H100.H100_PICKUP_COMPANY","H100.H100_SUPPLY_COUNTRY",'CDX',"H100.HT_GLOPS_LOS_NO","H100.H100_EXTERNAL_LOS_NO")
      FROM hist100_tempview AS H100) AS SOURCE
     ON (TARGET.TxnGUID = SOURCE.TxnGUID)
     WHEN MATCHED THEN
      UPDATE     
         SET RecordMOdifiedDate  = GETDATE()
    

    DataFrames code (generated from ChatGPT and not tested as I do not have a dataset to work with):

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import current_timestamp
    
    # Assuming you have a SparkSession named spark
    
    # Read the temp views into DataFrames
    target_df = spark.table("Transactions_Crdx")
    source_df = spark.sql("SELECT H100.*, ufnGetGlobslSiteIdForTxns(...) AS GLOBAL_SITE_ID FROM hist100_tempview AS H100")
    
    # Define the condition for the merge
    condition = target_df["TxnGUID"] == source_df["TxnGUID"]
    
    # Perform the merge
    merged_df = target_df \
        .merge(source_df, condition, "WHEN MATCHED THEN UPDATE SET RecordMOdifiedDate = current_timestamp()")
    
    # Display the result or write it back to a table
    merged_df.show()
    
    # If you want to write the result back to a table, you can use:
    # merged_df.write.mode("overwrite").saveAsTable("Transactions_Crdx")
    

    If this is helpful please accept answer.


Your answer

Answers can be marked as 'Accepted' by the question author and 'Recommended' by moderators, which helps users know the answer solved the author's problem.