Megosztás a következőn keresztül:


Felhasználó által definiált skaláris függvények – Python

Ez a cikk a Python felhasználó által definiált függvényeinek (UDF) példáit tartalmazza. Bemutatja, hogyan regisztrálhatja az UDF-eket, hogyan hívhatja meg az UDF-eket, és bemutatja a Spark SQL-ben az alkifejezések kiértékelési sorrendjét.

A Databricks Runtime 14.0-s és újabb verzióiban a Python felhasználó által definiált táblafüggvényeivel (UDTF-ekkel) olyan függvényeket regisztrálhat, amelyek skaláris értékek helyett teljes kapcsolatokat ad vissza. Lásd: Python felhasználó által definiált táblafüggvények (UDTF-ek).

Feljegyzés

A Databricks Runtime 12.2 LTS és újabb verziókban a Python UDF-ek és a Pandas UDF-ek nem támogatottak a Unity Katalógusban a megosztott hozzáférési módot használó számítási eszközökön. A Skaláris Python UDF-ek és a skaláris Pandas UDF-ek a Databricks Runtime 13.3 LTS-ben és annál is támogatottak az összes hozzáférési mód esetében.

A Databricks Runtime 13.3 LTS-ben és újabb verziókban skaláris Python UDF-eket regisztrálhat a Unity Catalogban SQL-szintaxissal. Lásd a felhasználó által definiált függvényeket (UDF-eket) a Unity Catalogban.

Függvény regisztrálása UDF-ként

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

Igény szerint beállíthatja az UDF visszatérési típusát. Az alapértelmezett visszatérési típus a következő StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Az UDF meghívása a Spark SQL-ben

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

UDF használata DataFrame-ekkel

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Másik lehetőségként deklarálhatja ugyanezt az UDF-et széljegyzetszintaxissal:

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Kiértékelési sorrend és null ellenőrzés

A Spark SQL (beleértve az SQL-t és a DataFrame- és Dataset API-t) nem garantálja az alexpressziók kiértékelésének sorrendjét. Az operátorok vagy függvények bemeneteit nem feltétlenül értékelik ki balról jobbra vagy más rögzített sorrendben. A logikai AND és OR kifejezési kifejezések például nem rendelkeznek balról jobbra "rövidzárolás" szemantikával.

Ezért veszélyes a logikai kifejezések mellékhatásaira vagy kiértékelési sorrendjére, valamint a záradékok sorrendjére WHERE HAVING támaszkodni, mivel az ilyen kifejezések és záradékok átrendezhetők a lekérdezésoptimalizálás és -tervezés során. Pontosabban, ha egy UDF az SQL rövidzárlatú szemantikára támaszkodik a nullellenőrzéshez, nincs garancia arra, hogy a null ellenőrzés az UDF meghívása előtt fog történni. Például:

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Ez a WHERE záradék nem garantálja az UDF meghívását a strlen null értékek szűrése után.

A megfelelő nullellenőrzés végrehajtásához az alábbiak valamelyikét javasoljuk:

  • Állítsa az UDF-et nullérzékenysé, és végezze el a null ellenőrzést magában az UDF-ben
  • CASE WHEN Null-ellenőrzés vagy kifejezések használata IF és az UDF meghívása feltételes ágban
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Korlátozások

  • A megosztott fürtökön vagy kiszolgáló nélküli számítási feladatokon futó PySpark UDF-ek nem férnek hozzá a Git-mappákhoz, munkaterületfájlokhoz vagy UC-kötetekhez a Databricks Runtime 14.2-s és újabb moduljainak importálásához.