Condividi tramite


Funzioni a valori scalari definite dall'utente - Scala

Questo articolo contiene esempi di funzioni Scala definite dall'utente (UDF). Illustra come registrare funzioni definite dall'utente (UDF), come richiamare le UDF e le avvertenze relative all'ordine di valutazione delle sotto-espressioni in Spark SQL. Per altri dettagli, vedere Funzioni scalari esterne definite dall'utente (UDF).

Nota

Le funzioni definite dall'utente Scala nelle risorse di calcolo abilitate per Unity Catalog con modalità di accesso standard (in precedenza modalità di accesso condiviso) richiedono Databricks Runtime 14.2 o versioni successive.

Registrare una funzione come UDF

val squared = (s: Long) => {
  s * s
}
spark.udf.register("square", squared)

Invocare l’UDF in Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test

Usare la UDF con DataFrames

import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))

"Ordine di valutazione e controllo dei null"

Spark SQL (inclusi SQL e le API di DataFrame e del set di dati) non garantisce l'ordine di valutazione delle sottoespressioni. In particolare, gli input di un operatore o di una funzione non vengono necessariamente valutati da sinistra a destra o in qualsiasi altro ordine fisso. Ad esempio, le espressioni logiche AND e OR non hanno semantica "corto circuito" da sinistra a destra.

Pertanto, è pericoloso basarsi sugli effetti collaterali o sull'ordine di valutazione delle espressioni booleane e sull'ordine delle clausole WHERE e HAVING, poiché tali espressioni e clausole possono essere riordinate durante l'ottimizzazione e la pianificazione delle query. In particolare, se una UDF si basa sulla semantica di corto circuito in SQL per il controllo dei valori null, non c'è garanzia che il controllo venga eseguito prima di richiamare la UDF. Ad esempio,

spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee

Questa clausola WHERE non garantisce che la UDF strlen venga richiamata dopo aver filtrato i valori null.

Per eseguire un controllo null appropriato, è consigliabile eseguire una delle operazioni seguenti:

  • Rendere la funzione definita dall'utente che riconosce i valori null ed eseguire il controllo null all'interno della funzione definita dall'utente stessa
  • Usare le espressioni IF o CASE WHEN per eseguire il controllo di null e richiamare l'UDF in un ramo condizionale
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

API del set di dati tipizzato

Nota

Questa funzionalità è supportata nei cluster abilitati per Unity Catalog con modalità di accesso standard in Databricks Runtime 15.4 e versioni successive.

Le API del set di dati tipizzato consentono di eseguire trasformazioni, ad esempio mapping, filtro e aggregazioni sui set di dati risultanti con una funzione definita dall'utente.

Ad esempio, l'applicazione Scala seguente usa l'API map() per modificare un numero in una colonna di risultato in una stringa con prefisso.

spark.range(3).map(f => s"row-$f").show()

Anche se questo esempio usa l'API map(), questo vale anche per altre API del set di dati tipizzato, ad esempio filter(), mapPartitions(), foreach(), foreachPartition(), reduce() e flatMap().

Funzionalità di UDF in Scala e compatibilità con Databricks Runtime

Le funzionalità scala seguenti richiedono versioni minime di Databricks Runtime quando vengono usate nei cluster abilitati per Unity Catalog in modalità di accesso standard (condiviso).

Caratteristica / Funzionalità Versione minima di Databricks Runtime
Funzioni definite dall'utente scalari Databricks Runtime 14.2
Dataset.map, Dataset.mapPartitions, Dataset.filter, Dataset.reduceDataset.flatMap Databricks Runtime 15.4
KeyValueGroupedDataset.flatMapGroups, KeyValueGroupedDataset.mapGroups Databricks Runtime 15.4
(Streaming) foreachWriter Sink Databricks Runtime 15.4
(Streaming) foreachBatch Databricks Runtime 16.1
(Streaming) KeyValueGroupedDataset.flatMapGroupsWithState Databricks Runtime 16.2