Megosztás:


Felhasználó által definiált függvények a Pythonhoz készült Databricks Connectben

Jegyzet

Ez a cikk a Databricks Connect for Databricks Runtime 13.3-at és újabb verzióját ismerteti.

A Databricks Connect for Python támogatja a felhasználó által definiált függvényeket (UDF). Az UDF-eket tartalmazó DataFrame-művelet végrehajtásakor az UDF-eket a Databricks Connect szerializálja, és a kérés részeként elküldi a kiszolgálónak.

A Databricks Connect for Scalához készült UDF-ekről további információt a Databricks Connect for Scala felhasználó által definiált funkcióiban talál.

Jegyzet

Mivel a felhasználó által definiált függvény szerializálva és deszerializálva van, az ügyfél Python-verziójának meg kell egyeznie az Azure Databricks-számítás Python-verziójával. A támogatott verziókért tekintse meg a verziótámogatási mátrixot.

UDF definiálása

Ha UDF-t szeretne létrehozni a Pythonhoz készült Databricks Connectben, használja az alábbi támogatott függvények egyikét:

Az alábbi Python például beállít egy egyszerű UDF-t, amely négyzetre állítja az oszlop értékeit.

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
    return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

Függőségekkel rendelkező UDF-ek

Fontos

Ez a funkció megköveteli a Databricks Connect for Python 16.4 vagy annál újabb verzióit, valamint egy Databricks Runtime 16.4-et vagy annál újabb verziót futtató fürtöt, és nyilvános előnézetben érhető el. A funkció használatához engedélyezze a Továbbfejlesztett Python UDF-eket a Unity Katalógusban a munkaablakban.

A Databricks Connect támogatja az UDF-ekhez szükséges Python-függőségek megadását. Ezek a függőségek a Databricks-számításra vannak telepítve a UDF Python-környezetének részeként.

Ez a funkció lehetővé teszi a felhasználók számára, hogy az alapkörnyezetben biztosított csomagokon kívül olyan függőségeket is meghatározjanak, amelyekre az UDF-nek szüksége van. A csomag másik verziójának telepítésére is használható az alapkörnyezetben megadotttól.

A függőségi elemek a következő forrásokból telepíthetők:

  • PyPI-csomagok
    • A PyPI-csomagok a PEP 508 szerint adhatók meg, dicepéldául . pyjokes<1simplejson==3.19.*
  • Unity Catalog-kötetekben tárolt fájlok
    • A wheel csomagok (.whl) és a gzipped tar fájlok (.tar.gz) teljes mértékben támogatottak. A felhasználónak engedélyt kell kapnia READ_FILE a fájlra a re:[UC] kötetben.
    • A Unity Catalog-kötetekből származó csomagok telepítésekor az UDF-ek meghívásához a felhasználóknak engedélyre van szükségük READ VOLUME a forrásköteten. Ha ezt az engedélyt minden fiókfelhasználónak megadja, az automatikusan engedélyezi ezt az új felhasználók számára.
    • A Unity-katalógus kötetfájljait a következőképpen kell megadni: például dbfs:<path>, dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl, vagy dbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz.

Ha egyéni függőségeket szeretne belefoglalni a UDF-be, adja meg őket egy környezetben a withDependencies segítségével, majd ezt a környezetet használja egy Spark-munkamenet létrehozásához. A függőségek telepítve vannak a Databricks-számításra, és minden olyan UDF-ben elérhetők lesznek, amelyek ezt a Spark-munkamenetet használják.

A következő kód függőségként deklarálja a PyPI-csomagot dice :

from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Vagy egy kötetben lévő kerék függőségének megadásához:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Viselkedés a Databricks-jegyzetfüzetekben és -feladatokban

Jegyzetfüzetekben és munkamenetekben az UDF-függőségeket közvetlenül a REPL-ben kell telepíteni. A Databricks Connect ellenőrzi a REPL Python-környezetet, és ellenőrzi, hogy az összes megadott függőség már telepítve van-e, és kivételt okoz, ha nincsenek telepítve.

A jegyzetfüzet-környezet érvényesítése a PyPI és a Unity Catalog kötetfüggőségei esetében is megtörténik. A kötetfüggőségeket a PEP-427-ben vagy újabb verziókban, a forrásterjesztési fájlok esetében pedig a PEP-241-ben vagy újabb verzióban megadott standard Python-csomagolási specifikációk alapján kell csomagolni. A Python csomagolási szabványokkal kapcsolatos további információkért tekintse meg a PyPA dokumentációját.

Korlátozások

  • Az olyan fájlok, mint a Python-kerék vagy a helyi fejlesztőgép forráseloszlása, nem határozhatók meg közvetlenül függőségként. Ezeket először fel kell tölteni a Unity Catalog-kötetekre.
  • Az pyspark.sql.streaming.DataStreamWriter.foreach és pyspark.sql.streaming.DataStreamWriter.foreachBatch UDF-függőségeinek támogatásához a Databricks Connect for Python 18.0 vagy annál újabb verziója, valamint egy Databricks Runtime 18.0 vagy annál újabb verzióját futtató fürt szükséges.
  • Az UDF-függőségek nem támogatottak a pandas-aggregációs UDF-ek esetében az ablakfüggvényeken keresztül.

Példák

Az alábbi példa definiálja a PyPI- és kötetfüggőségeket egy környezetben, létrehoz egy munkamenetet a környezettel, majd definiálja és meghívja azokat a függőségeket használó UDF-eket:

from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
    # Example library from: https://pypi.org/project/dice/#files
    "dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
    # Example library from: https://pypi.org/project/simplejson/#files
    "dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
    from pyjokes import get_joke
    return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    import simplejson
    return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
    import dice
    return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
    "joke": get_joke(),
    "doubled": double_and_json_parse(col("id")),
    "mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Python alapkörnyezet

Az UDF-ek végrehajtása a Databricks-számításon történik, nem az ügyfélen. Az alap Python-környezet, amelyben az UDF-ek végrehajtása történik, a Databricks-számítástól függ.

Fürtök esetén az alap Python-környezet a fürtön futó Databricks Runtime-verzió Python-környezete. A Python-verzió és az alapkörnyezetben található csomagok listája a Databricks Runtime kibocsátási megjegyzéseinekRendszerkörnyezet és Telepített Python-kódtárak szakaszában található.

Kiszolgáló nélküli számítás esetén az alap Python-környezet a kiszolgáló nélküli környezet verziójának felel meg az alábbi táblázat szerint.

Databricks Connect-verzió UDF kiszolgáló nélküli környezet
17.0–17.3, Python 3.12 4-es verzió
16.4.1–17 alá, Python 3.12 3-as verzió
15.4.10–16 alá, Python 3.12 3-as verzió
15.4.10–16 alá, Python 3.11 2-es verzió
15.4.0–15.4.9 és 16.0–16.3 Legújabb kiszolgáló nélküli számítás. A stabil Python-környezethez migráljon a 15.4.10 LTS-hez és a 16.4.1 LTS-hez vagy annál magasabbra.