Skalární funkce definované uživatelem – Python
Tento článek obsahuje příklady uživatelem definované funkce Pythonu (UDF). Ukazuje, jak zaregistrovat funkce definované uživatelem, jak vyvolat funkce definované uživatelem a poskytuje upozornění na pořadí vyhodnocení dílčích výrazů ve Spark SQL.
V Databricks Runtime 14.0 a novějších můžete pomocí uživatelem definovaných tabulkových funkcí Pythonu (UDTFs) zaregistrovat funkce, které místo skalárních hodnot vracejí celé relace. Viz uživatelem definované funkce tabulek v Pythonu (UDTFs).
Poznámka:
Ve službě Databricks Runtime 12.2 LTS a níže nejsou u výpočetních prostředků katalogu Unity, které používají režim sdíleného přístupu, podporované uživatelem definované uživatelem Pythonu a uživatelem pandas. Skalární uživatelem definované funkce Pythonu a uživatelem definované funkce Pandas jsou podporovány ve službě Databricks Runtime 13.3 LTS a vyšší pro všechny režimy přístupu.
Ve službě Databricks Runtime 13.3 LTS a novějších můžete pomocí syntaxe SQL zaregistrovat skalární uživatelem definované uživatelem Pythonu do katalogu Unity. Viz uživatelem definované funkce (UDF) v katalogu Unity.
Registrace funkce jako funkce definované uživatelem
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
Volitelně můžete nastavit návratový typ definovaného uživatelem. Výchozí návratový typ je StringType
.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Volání funkce definovaná uživatelem ve Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Použití UDF s datovými rámci
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Alternativně můžete deklarovat stejnou funkci definovanou uživatelem pomocí syntaxe poznámek:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Kontrola pořadí vyhodnocení a hodnoty null
Spark SQL (včetně SQL a rozhraní DATAFrame and Dataset API) nezaručuje pořadí vyhodnocení dílčích výrazů. Zejména vstupy operátoru nebo funkce nejsou nutně vyhodnoceny zleva doprava nebo v jiném pevném pořadí. Například logické AND
výrazy OR
nemají sémantiku "zkratování" zleva doprava.
Proto je nebezpečné spoléhat se na vedlejší účinky nebo pořadí vyhodnocení logických výrazů a pořadí WHERE
HAVING
a pořadí klauzulí, protože tyto výrazy a klauzule je možné změnit pořadí během optimalizace a plánování dotazů. Konkrétně platí, že pokud UDF spoléhá na sémantiku zkratování v SQL pro kontrolu hodnoty null, neexistuje žádná záruka, že se kontrola hodnoty null provede před vyvoláním funkce definované uživatelem. Příklad:
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Tato WHERE
klauzule nezaručuje, že se funkce definovaná uživatelem strlen
bude volat po vyfiltrování hodnot null.
Pokud chcete provést správnou kontrolu hodnoty null, doporučujeme provést některou z následujících akcí:
- Nastavení samotné funkce definovaná uživatelem s podporou hodnoty null a provedení kontroly hodnoty null uvnitř samotného uživatelem definovaného uživatelem
- Použití
IF
neboCASE WHEN
výrazy k ověření hodnoty null a vyvolání funkce definovaná uživatelem v podmíněné větvi
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Omezení
- Funkce definované uživatelem PySpark ve sdílených clusterech nebo bezserverových výpočetních prostředcích nemají přístup ke složkám Gitu, souborům pracovních prostorů nebo svazkům UC za účelem importu modulů ve službě Databricks Runtime 14.2 a novějších.