Condividi tramite


Funzioni scalari definite dall'utente - Python

Questo articolo contiene esempi di funzioni definite dall'utente (UDF) python. Illustra come registrare le UDF, come richiamare le UDF e fornisce avvertenze sull'ordine di valutazione delle sottoespressioni in Spark SQL.

In Databricks Runtime 14.0 e versioni successive è possibile usare funzioni di tabella definite dall'utente Python per registrare funzioni che restituiscono intere relazioni anziché valori scalari. Vedere Funzioni di tabella definite dall'utente python (UDF) .

Nota

In Databricks Runtime 12.2 LTS e versioni precedenti, le funzioni definite dall'utente Python e Pandas non sono supportate nelle risorse di calcolo di Unity Catalog che usano la modalità di accesso standard. Le funzioni utente scalari Python e le funzioni utente Pandas sono supportate in Databricks Runtime 13.3 LTS e versioni successive per tutte le modalità di accesso.

In Databricks Runtime 13.3 LTS e successive, è possibile registrare funzioni definite dall'utente Python scalari nel Catalogo Unity usando la sintassi SQL. Si veda Funzioni definite dall'utente (UDF) nel catalogo Unity.

Registrare una funzione come UDF

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

È possibile facoltativamente impostare il tipo di ritorno della funzione definita dall'utente. Il tipo restituito predefinito è StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Richiama l’UDF in Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Usare le UDF con i DataFrame

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

In alternativa, è possibile dichiarare la stessa UDF usando la sintassi di annotazione.

from pyspark.sql.functions import udf

@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Ordine di valutazione e controllo del valore nullo

Spark SQL (incluso SQL e l'API DataFrame e Dataset) non garantisce l'ordine di valutazione delle sottoespressioni. In particolare, gli input di un operatore o di una funzione non vengono necessariamente valutati da sinistra a destra o in qualsiasi altro ordine fisso. Ad esempio, le espressioni logiche AND e OR non hanno la semantica di “cortocircuito” da sinistra a destra.

Pertanto, è pericoloso basarsi sugli effetti collaterali o sull'ordine di valutazione delle espressioni booleane e sull'ordine delle clausole WHERE e HAVING, poiché tali espressioni e clausole possono essere riordinate durante l'ottimizzazione e la pianificazione delle query. In particolare, se una UDF si basa sulla semantica di corto circuito in SQL per il controllo dei valori null, non c'è garanzia che il controllo venga eseguito prima di richiamare la UDF. Ad esempio,

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Questa clausola WHERE non garantisce che la UDF strlen venga invocata dopo il filtraggio dei valori nulli.

Per eseguire un controllo null appropriato, è consigliabile eseguire una delle operazioni seguenti:

  • Rendere la funzione definita dall'utente null-aware ed eseguire il controllo dei valori null all'interno di questa stessa funzione.
  • Utilizzare le espressioni IF o CASE WHEN per effettuare il controllo su valori nulli e invocare una funzione definita dall'utente all'interno di un ramo condizionale.
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Ottenere il contesto di esecuzione delle attività

Usare l'API PySpark TaskContext per ottenere informazioni di contesto, ad esempio l'identità dell'utente, i tag del cluster, l'ID processo Spark e altro ancora. Vedi Ottieni il contesto del compito in una UDF.

Limiti

Le seguenti limitazioni si applicano alle UDF di PySpark.

  • Restrizioni di accesso ai file: In Databricks Runtime 14.2 e versioni precedenti le funzioni definite dall'utente PySpark nei cluster condivisi non possono accedere a cartelle Git, file dell'area di lavoro o volumi del catalogo Unity.

  • Variabili di trasmissione: le UDF PySpark nei cluster in modalità di accesso standard e nel calcolo serverless non supportano le variabili di trasmissione.

  • limite di memoria: le UDF di PySpark nel calcolo serverless hanno un limite di memoria di 1 GB per ogni UDF di PySpark. Il superamento di questo limite genera l'errore seguente: [UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.