databricks spark sql Custom table valued function + struct really slow (minutes for a single row)

Jakub K 1 Reputation point
2022-08-05T03:16:42.567+00:00

I have a custom table valued function which takes a URL as a parameter and outputs a single row table with certain elements from the URL extracted/labelled

(i get search activity URLs and when in a specific format I can retrieve values from the url path and ?parameters which i put into separate columns for analysis)

it works but it's extremely slow and I think it's due to the split(parse_url(url,'PATH')) call I do

here's the function

%sql  
CREATE  
OR REPLACE FUNCTION urlSplitter(url STRING) RETURNS TABLE(  
  Url STRING,  
  UrlPath STRING,  
  UrlParams STRING,  
  SearchState STRING,  
  SearchRegion STRING,  
  SearchParkName STRING,  
  PropertyCode STRING,  
  NoOfTokens INT  
) RETURN with src as (  
  SELECT  
  -- prepare the url into the separate components for further processing  
    url,  
    parse_url(url, "PATH") as UrlPath,  
    split(parse_url(url, "PATH"), '/') as PathArray,  
    split(parse_url(url, "PATH"), '/')[1] as PathArray1,  
    split(parse_url(url, "PATH"), '/')[2] as PathArray2,  
    split(parse_url(url, "PATH"), '/')[3] as PathArray3,  
    split(parse_url(url, "PATH"), '/')[4] as PathArray4,  
    parse_url(url, "QUERY") as UrlParams,  
    parse_url(url, "QUERY", "propertyCode") as PropertyCode  
)  
SELECT  
  s.Url,  
  s.UrlPath,  
  s.UrlParams,  
  PathArray[2],  
  PathArray[3],  
  PathArray[4],  
  s.PropertyCode,  
  size(PathArray) -2 AS NoOfTokens  
from  
  src s  

now, the weird thing is if i swap the PathArray[x] calls in the final select to the PathArrayX columns I create in the initial CTE it all goes really fast?
I like to use staged CTEs so I can test parts in isolation and not have repeated function calls, plus this is a very simplified version of what I'm doing even though it exhibits the behaviour - I do more complex stuff later and dont want have to paste nested split(parse_url(etc etc)) functions all over the place in case statements etc

The 'slow' function works fine for literals passed to it but falls apart when referencing a normal table even with a predicate which retrieves a single row based on a unique id (table is a few gig, partitioned, optimized, zordered etc)

slow plan

== Physical Plan ==  
AdaptiveSparkPlan isFinalPlan=false  
+- CollectLimit 100  
   +- Project [ActivityId#41282, ActivityURL#41289, Url#41327, UrlPath#41328, UrlParams#41334, SearchState#41307, SearchRegion#41308, SearchParkName#41309, PropertyCode#41335, NoOfTokens#41311]  
      +- SortMergeJoin [coalesce(ActivityUrl#41289, ), isnull(ActivityUrl#41289)], [coalesce(ActivityUrl#41364, ), isnull(ActivityUrl#41364)], Inner  
         :- Sort [coalesce(ActivityUrl#41289, ) ASC NULLS FIRST, isnull(ActivityUrl#41289) ASC NULLS FIRST], false, 0  
         :  +- ColumnarToRow  
         :     +- PhotonResultStage  
         :        +- PhotonShuffleExchangeSource  
         :           +- PhotonShuffleMapStage  
         :              +- PhotonShuffleExchangeSink hashpartitioning(coalesce(ActivityUrl#41289, ), isnull(ActivityUrl#41289), 200)  
         :                 +- PhotonProject [ActivityID#41282, ActivityURL#41289]  
         :                    +- PhotonAdapter  
         :                       +- FileScan parquet *[ActivityID#41282,ActivityURL#41289,PartitionKey#41303] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[abfss://*.dfs.core.windows.net/*..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ActivityID:int,ActivityURL:string>  
         +- Sort [coalesce(ActivityUrl#41364, ) ASC NULLS FIRST, isnull(ActivityUrl#41364) ASC NULLS FIRST], false, 0  
            +- Exchange hashpartitioning(coalesce(ActivityUrl#41364, ), isnull(ActivityUrl#41364), 200), ENSURE_REQUIREMENTS, [id=#19269]  
               +- Project [Url#41327, UrlPath#41328, UrlParams#41334, PathArray#41329[2] AS SearchState#41307, PathArray#41329[3] AS SearchRegion#41308, PathArray#41329[4] AS SearchParkName#41309, PropertyCode#41335, (size(PathArray#41329, true) - 2) AS NoOfTokens#41311, ActivityUrl#41364]  
                  +- Project [ActivityUrl#41289 AS url#41327, parse_url(ActivityUrl#41289, PATH, false) AS UrlPath#41328, split(parse_url(ActivityUrl#41289, PATH, false), /, -1) AS PathArray#41329, parse_url(ActivityUrl#41289, QUERY, false) AS UrlParams#41334, parse_url(ActivityUrl#41289, QUERY, propertyCode, false) AS PropertyCode#41335, ActivityUrl#41289 AS ActivityUrl#41364]  
                     +- ColumnarToRow  
                        +- PhotonResultStage  
                           +- PhotonGroupingAgg(keys=[ActivityUrl#41289], functions=[])  
                              +- PhotonShuffleExchangeSource  
                                 +- PhotonShuffleMapStage  
                                    +- PhotonShuffleExchangeSink hashpartitioning(ActivityUrl#41289, 200)  
                                       +- PhotonGroupingAgg(keys=[ActivityUrl#41289], functions=[])  
                                          +- PhotonProject [ActivityURL#41289]  
                                             +- PhotonAdapter  
                                                +- FileScan parquet *[ActivityURL#41289,PartitionKey#41303] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[abfss://*.dfs.core.windows.net/*..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ActivityURL:string>  
  

fast plan

== Physical Plan ==  
CollectLimit 100  
+- Project [ActivityId#41482, ActivityURL#41489, ActivityUrl#41489 AS Url#41504, parse_url(ActivityUrl#41489, PATH, false) AS UrlPath#41505, parse_url(ActivityUrl#41489, QUERY, false) AS UrlParams#41506, split(parse_url(ActivityUrl#41489, PATH, false), /, 4)[2] AS SearchState#41507, split(parse_url(ActivityUrl#41489, PATH, false), /, 5)[3] AS SearchRegion#41508, split(parse_url(ActivityUrl#41489, PATH, false), /, 6)[4] AS SearchParkName#41509, parse_url(ActivityUrl#41489, QUERY, propertyCode, false) AS PropertyCode#41510, (size(split(parse_url(ActivityUrl#41489, PATH, false), /, -1), true) - 2) AS NoOfTokens#41511]  
   +- *(1) ColumnarToRow  
      +- PhotonResultStage  
         +- PhotonAdapter  
            +- FileScan parquet *[ActivityID#41482,ActivityURL#41489,PartitionKey#41503] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[abfss://*.dfs.core.windows.net/*, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ActivityID:int,ActivityURL:string>  
  
  
== Photon Explanation ==  
Photon does not fully support the query because:  
 Unsupported expression(s): parse_url(ActivityUrl#41489, PATH, false), parse_url(ActivityUrl#41489, QUERY, false), parse_url(ActivityUrl#41489, PATH, false), parse_url(ActivityUrl#41489, PATH, false), parse_url(ActivityUrl#41489, PATH, false), parse_url(ActivityUrl#41489, QUERY, propertyCode, false), parse_url(ActivityUrl#41489, PATH, false)  
reference node:  
 Project [ActivityId#41482, ActivityURL#41489, ActivityUrl#41489 AS Url#41504, parse_url(ActivityUrl#41489, PATH, false) AS UrlPath#41505, parse_url(ActivityUrl#41489, QUERY, false) AS UrlParams#41506, split(parse_url(ActivityUrl#41489, PATH, false), /, 4)[2] AS SearchState#41507, split(parse_url(ActivityUrl#41489, PATH, false), /, 5)[3] AS SearchRegion#41508, split(parse_url(ActivityUrl#41489, PATH, false), /, 6)[4] AS SearchParkName#41509, parse_url(ActivityUrl#41489, QUERY, propertyCode, false) AS PropertyCode#41510, (size(split(parse_url(ActivityUrl#41489, PATH, false), /, -1), true) - 2) AS NoOfTokens#41511]  

is this a bug, alternative approaches?

the usage is
select a.url, u.*
from activities a, lateral urlsplitter(a.url) u
where id = 1

Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
2,528 questions
{count} votes

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.