Funzioni scalari definite dall'utente - Python

Questo articolo contiene esempi di funzioni definite dall'utente (UDF) python. Illustra come registrare funzioni definite dall'utente, come richiamare funzioni definite dall'utente e fornisce avvertenze sull'ordine di valutazione delle sottoespressioni in Spark SQL.

In Databricks Runtime 14.0 e versioni successive è possibile usare funzioni di tabella definite dall'utente Python per registrare funzioni che restituiscono intere relazioni anziché valori scalari. Vedere Che cosa sono le funzioni di tabella definite dall'utente python?.

Nota

In Databricks Runtime 12.2 LTS e versioni successive le funzioni definite dall'utente Python e Pandas non sono supportate nel catalogo Unity nel calcolo che usa la modalità di accesso condiviso. Le funzioni definite dall'utente Python scalari e le funzioni definite dall'utente Pandas scalari sono supportate in Databricks Runtime 13.3 LTS e versioni successive per tutte le modalità di accesso.

In Databricks Runtime 13.3 LTS e versioni successive è possibile registrare funzioni definite dall'utente Python scalari nel catalogo unity usando la sintassi SQL. Vedere Funzioni definite dall'utente (UDF) nel catalogo unity.

Registrare una funzione come funzione definita dall'utente

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

Facoltativamente, è possibile impostare il tipo restituito della funzione definita dall'utente. Il tipo restituito predefinito è StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Chiamare la funzione definita dall'utente in Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Usare la funzione definita dall'utente con i dataframe

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

In alternativa, è possibile dichiarare la stessa funzione definita dall'utente usando la sintassi di annotazione:

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Ordine di valutazione e controllo null

Spark SQL (incluso SQL e l'API DataFrame e Dataset) non garantisce l'ordine di valutazione delle sottoespressioni. In particolare, gli input di un operatore o di una funzione non vengono necessariamente valutati da sinistra a destra o in qualsiasi altro ordine fisso. Ad esempio, le espressioni logiche AND e OR non hanno semantica "corto circuito" da sinistra a destra.

Pertanto, è pericoloso basarsi sugli effetti collaterali o sull'ordine di valutazione delle espressioni booleane e sull'ordine delle WHERE clausole e HAVING , poiché tali espressioni e clausole possono essere riordinate durante l'ottimizzazione e la pianificazione delle query. In particolare, se una funzione definita dall'utente si basa sulla semantica di corto circuito in SQL per il controllo null, non esiste alcuna garanzia che il controllo Null venga eseguito prima di richiamare la funzione definita dall'utente. ad esempio:

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Questa WHERE clausola non garantisce che la funzione definita dall'utente strlen venga richiamata dopo aver filtrato i valori Null.

Per eseguire un controllo Null appropriato, è consigliabile eseguire una delle operazioni seguenti:

  • Rendere la funzione definita dall'utente che riconosce i valori Null ed eseguire il controllo Null all'interno della funzione definita dall'utente stessa
  • Usare IF o CASE WHEN espressioni per eseguire il controllo Null e richiamare la funzione definita dall'utente in un ramo condizionale
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok