Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Tento článek obsahuje příklady uživatelem definované funkce Scala (UDF). Ukazuje, jak zaregistrovat uživatelsky definované funkce, jak je vyvolat, a upozorňuje na výhrady ohledně pořadí vyhodnocení dílčích výrazů ve Spark SQL. Další podrobnosti najdete v tématu Externí uživatelem definované skalární funkce (UDF ).
Poznámka:
Uživatelsky definované funkce Scala na výpočetních prostředcích s podporou katalogu Unity ve standardním režimu přístupu (dříve sdílený režim přístupu) vyžadují Databricks Runtime verze 14.2 a vyšší.
Registrace funkce jako UDF
val squared = (s: Long) => {
s * s
}
spark.udf.register("square", squared)
Zavolejte UDF v Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test
Použití UDF s DataFrames
import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))
Kontrola pořadí vyhodnocení a hodnoty null
Spark SQL (včetně rozhraní SQL a rozhraní API datových rámců a datových sad) nezaručuje pořadí vyhodnocení dílčích výrazů. Zejména vstupy operátoru nebo funkce nejsou nutně vyhodnoceny zleva doprava nebo v jiném pevném pořadí. Například logické AND výrazy OR nemají sémantiku "zkratování" zleva doprava.
Proto je nebezpečné spoléhat se na vedlejší účinky nebo pořadí vyhodnocení logických výrazů, stejně jako na pořadí klauzulí WHERE a HAVING, protože během optimalizace a plánování dotazů může být pořadí těchto výrazů a klauzulí změněno. Konkrétně platí, že pokud UDF spoléhá na krátké spojení v SQL pro kontrolu hodnoty null, neexistuje žádná záruka, že se kontrola hodnoty null provede před vyvoláním UDF. Příklad:
spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee
Tato WHERE klauzule nezaručuje, že se UDF (uživatelsky definovaná funkce) strlen bude volat poté, co budou vyfiltrovány hodnoty null.
Pokud chcete provést správnou kontrolu hodnoty null, doporučujeme provést některou z následujících akcí:
- Udělejte samotnou uživatelsky definovanou funkci schopnou pracovat s hodnotami null a proveďte kontrolu na hodnoty null uvnitř samotné UDF.
- Použijte
IFneboCASE WHENvýrazy k ověření hodnoty null a pro vyvolání funkce definované uživatelem v podmíněné větvi.
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Nativně typované API pro datové sady
Poznámka:
Tato funkce je podporována v clusterech s podporou katalogu Unity se standardním režimem přístupu v Databricks Runtime 15.4 a novějším.
Rozhraní API pro typové datové sady umožňují spouštět transformace, jako je mapování, filtrování a agregace na výsledných datových sadách s uživatelsky vytvořenou funkcí.
Například následující aplikace Scala používá map() rozhraní API k úpravě čísla ve výsledném sloupci na řetězec s předponou.
spark.range(3).map(f => s"row-$f").show()
I přestože tento příklad používá rozhraní API map(), platí to také pro jiná typovaná rozhraní API datových sad, například filter(), mapPartitions(), foreach(), foreachPartition(), reduce()a flatMap().
Funkce UDF Scala a kompatibilita Databricks Runtime
Následující funkce Scala vyžadují minimální verze modulu Runtime Databricks, pokud se používají v clusterech s podporou katalogu Unity v režimu standardního (sdíleného) přístupu.
| Vlastnost | Minimální verze Databricks Runtime |
|---|---|
| Skalární funkce definované uživatelem | Databricks Runtime 14.2 |
Dataset.map, Dataset.mapPartitions, Dataset.filter, , Dataset.reduceDataset.flatMap |
Databricks Runtime 15.4 |
KeyValueGroupedDataset.flatMapGroups, KeyValueGroupedDataset.mapGroups |
Databricks Runtime 15.4 |
(Streamování) foreachWriter Sink |
Databricks Runtime 15.4 |
(Streamování) foreachBatch |
Databricks Runtime 16.1 |
(Streamování) KeyValueGroupedDataset.flatMapGroupsWithState |
Databricks Runtime 16.2 |