Sdílet prostřednictvím


Skalární funkce definované uživatelem – Python

Tento článek obsahuje příklady uživatelem definované funkce Pythonu (UDF). Ukazuje, jak zaregistrovat funkce definované uživatelem, jak vyvolat funkce definované uživatelem a poskytuje upozornění na pořadí vyhodnocení dílčích výrazů ve Spark SQL.

V Databricks Runtime 14.0 a novějších můžete pomocí uživatelem definovaných tabulkových funkcí Pythonu (UDTFs) zaregistrovat funkce, které místo skalárních hodnot vracejí celé relace. Viz uživatelem definované funkce tabulek v Pythonu (UDTFs).

Poznámka:

V Databricks Runtime 12.2 LTS a níže nejsou definované uživatelem Pythonu a UDF Pandas podporovány v katalogu Unity na výpočetních prostředcích, které používají režim sdíleného přístupu. Skalární uživatelem a skalární uživatelem definované uživatelem v Pythonu jsou podporované ve službě Databricks Runtime 13.3 LTS a vyšší pro všechny režimy přístupu.

Ve službě Databricks Runtime 13.3 LTS a novějších můžete pomocí syntaxe SQL zaregistrovat skalární uživatelem definované uživatelem Pythonu do katalogu Unity. Viz uživatelem definované funkce (UDF) v katalogu Unity.

Registrace funkce jako funkce definované uživatelem

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

Volitelně můžete nastavit návratový typ definovaného uživatelem. Výchozí návratový typ je StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Volání funkce definovaná uživatelem ve Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Použití UDF s datovými rámci

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Alternativně můžete deklarovat stejnou funkci definovanou uživatelem pomocí syntaxe poznámek:

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Kontrola pořadí vyhodnocení a hodnoty null

Spark SQL (včetně SQL a rozhraní DATAFrame and Dataset API) nezaručuje pořadí vyhodnocení dílčích výrazů. Zejména vstupy operátoru nebo funkce nejsou nutně vyhodnoceny zleva doprava nebo v jiném pevném pořadí. Například logické AND výrazy OR nemají sémantiku "zkratování" zleva doprava.

Proto je nebezpečné spoléhat se na vedlejší účinky nebo pořadí vyhodnocení logických výrazů a pořadí WHERE HAVING a pořadí klauzulí, protože tyto výrazy a klauzule je možné změnit pořadí během optimalizace a plánování dotazů. Konkrétně platí, že pokud UDF spoléhá na sémantiku zkratování v SQL pro kontrolu hodnoty null, neexistuje žádná záruka, že se kontrola hodnoty null provede před vyvoláním funkce definované uživatelem. Příklad:

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Tato WHERE klauzule nezaručuje, že se funkce definovaná uživatelem strlen bude volat po vyfiltrování hodnot null.

Pokud chcete provést správnou kontrolu hodnoty null, doporučujeme provést některou z následujících akcí:

  • Nastavení samotné funkce definovaná uživatelem s podporou hodnoty null a provedení kontroly hodnoty null uvnitř samotného uživatelem definovaného uživatelem
  • Použití IF nebo CASE WHEN výrazy k ověření hodnoty null a vyvolání funkce definovaná uživatelem v podmíněné větvi
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Omezení

  • Funkce definované uživatelem PySpark ve sdílených clusterech nebo bezserverových výpočetních prostředcích nemají přístup ke složkám Gitu, souborům pracovních prostorů nebo svazkům UC za účelem importu modulů ve službě Databricks Runtime 14.2 a novějších.