Sdílet prostřednictvím


Uživatelem definované funkce v Databricks Connect pro Python

Poznámka

Tento článek popisuje Databricks Connect pro Databricks Runtime 13.3 a novější.

Databricks Connect pro Python podporuje uživatelem definované funkce (UDF). Když se spustí operace datového rámce, která obsahuje funkce definované uživatelem, budou tyto funkce serializovány nástrojem Databricks Connect a odesílány na server jako součást požadavku.

Informace o uživatelsky definovaných funkcích pro Databricks Connect pro Scala najdete v tématu Uživatelsky definované funkce v Databricks Connect pro Scala.

Poznámka

Vzhledem k tomu, že uživatelem definovaná funkce je serializována a deserializována, musí verze Pythonu klienta odpovídat verzi Pythonu ve výpočetních prostředcích Azure Databricks. Podporované verze najdete v matici podpory verzí .

Definovat funkci definovanou uživatelem

Pokud chcete vytvořit UDF v Databricks Connect pro Python, použijte jednu z následujících podporovaných funkcí:

Například následující Python kód vytvoří jednoduchou uživatelsky definovanou funkci, která umocní hodnoty ve sloupci.

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
    return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

Funkce definované uživatelem se závislostmi

Důležité

Tato funkce je ve verzi Public Preview a vyžaduje Databricks Connect pro Python 16.4 nebo vyšší a cluster se spuštěným Modulem Databricks Runtime 16.4 nebo novějším. Pokud chcete tuto funkci použít, povolte ve svém pracovním prostoru zobrazení náhledu rozšířené Python UDFs v Katalogu Unity.

Databricks Connect podporuje zadávání závislostí Pythonu požadovaných pro uživatelské definované funkce. Tyto závislosti se instalují do výpočetních prostředků Databricks jako součást Python prostředí uživatelsky definované funkce (UDF).

Tato funkce umožňuje uživatelům určit závislosti, které uživatelsky definovaná funkce potřebuje nad rámec balíčků poskytovaných v základním prostředí. Dá se také použít k instalaci jiné verze balíčku, než je k dispozici v základním prostředí.

Závislosti je možné nainstalovat z následujících zdrojů:

  • Balíčky PyPI
    • Balíčky PyPI lze specifikovat podle PEP 508, například dice, pyjokes<1 nebo simplejson==3.19.*.
  • Soubory uložené ve svazcích katalogu Unity
    • Podporují se balíčky Wheel (.whl) i gzipped tar soubory (.tar.gz). Uživateli musí být uděleno READ_FILE oprávnění k souboru ve svazku re:[UC].
    • Při instalaci balíčků ze svazků katalogu Unity potřebují uživatelé oprávnění READ VOLUME ke zdrojovému svazku, aby mohli vyvolat uživatelsky definované funkce (UDF). Udělení tohoto oprávnění všem uživatelům účtu automaticky aktivuje tuto funkci pro nové uživatele.
    • Soubory svazků katalogu Unity by měly být specifikovány jako dbfs:<path>, například dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl nebo dbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz.

Pokud chcete do funkce definované uživatelem (UDF) zahrnout vlastní závislosti, zadejte je v prostředí pomocí withDependencies, a pak toto prostředí použijte k vytvoření relace Sparku. Závislosti se nainstalují na vaše výpočetní prostředky Databricks a budou dostupné ve všech UDF (uživatelsky definovaných funkcích), které používají tuto Spark session.

Následující kód deklaruje balíček dice PyPI jako závislost:

from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Nebo pokud chcete určit závislost kolečka ve svazku:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Chování v poznámkových blocích a úlohách Databricks

V poznámkových blocích a úlohách musí být závislosti uživatelsky definovaných funkcí nainstalované přímo v REPL. Databricks Connect ověří prostředí Pythonu REPL tím, že ověří, že jsou už nainstalované všechny zadané závislosti, a vyvolá výjimku, pokud nejsou nainstalované.

Ověření prostředí poznámkového bloku probíhá jak pro závislosti balíčků PyPI, tak i závislosti katalogu Unity Catalog. Závislosti na objemu je třeba zabalit podle standardních specifikací balení Pythonu z PEP-427 nebo novější pro soubory ve formátu wheel a PEP-241 nebo novější pro soubory zdrojové distribuce. Další informace o standardech balení Pythonu najdete v dokumentaci k PyPA.

Omezení

  • Soubory, jako je kolo Pythonu nebo distribuce zdroje na místním vývojovém počítači, nelze zadat přímo jako závislost. Nejdříve je nutné je nahrát do svazků Unity Catalog.
  • Závislosti UDF podporují pyspark.sql.streaming.DataStreamWriter.foreach a pyspark.sql.streaming.DataStreamWriter.foreachBatch vyžadují Databricks Connect pro Python 18.0 nebo vyšší a kluster se spuštěným Databricks Runtime 18.0 nebo novějším.
  • Závislosti uživatelem definovaných funkcí nejsou podporovány pro agregace UDF knihovny pandas nad okenními funkcemi.

Příklady

Následující příklad definuje závislosti PyPI a svazků v prostředí, vytvoří relaci s tímto prostředím a pak definuje a volá uživatelem definované funkce, které tyto závislosti používají:

from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
    # Example library from: https://pypi.org/project/dice/#files
    "dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
    # Example library from: https://pypi.org/project/simplejson/#files
    "dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
    from pyjokes import get_joke
    return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    import simplejson
    return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
    import dice
    return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
    "joke": get_joke(),
    "doubled": double_and_json_parse(col("id")),
    "mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Základní prostředí Pythonu

Funkce definované uživatelem se spouští na výpočetních jednotkách Databricks, nikoli na straně klienta. Základní běhové prostředí Pythonu, ve kterém se spouští uživatelsky definované funkce, závisí na výpočetních prostředcích Databricks.

Základní prostředí Pythonu je pro clustery prostředí Pythonu verze Databricks Runtime spuštěné v clusteru. Verze Pythonu a seznam balíčků v tomto základním prostředí najdete v částech Systémové prostředí a nainstalované knihovny Pythonu v poznámkách k verzi Databricks Runtime.

Základní prostředí Pythonu pro bezserverové výpočetní prostředky odpovídá verzi bezserverového prostředí podle následující tabulky.

Verze Databricks Connect Bezserverové prostředí UDF
17.0 až 17.3, Python 3.12 Verze 4
16.4.1 až pod 17, Python 3.12 Verze 3
15.4.10 na nižší než 16, Python 3.12 Verze 3
15.4.10 na nižší než 16, Python 3.11 Verze 2
15.4.0 až 15.4.9 a 16.0 až 16.3 Nejnovější výpočetní prostředí bez serveru Pokud chcete stabilní prostředí Pythonu, proveďte migraci na verzi 15.4.10 LTS a vyšší nebo 16.4.1 LTS a vyšší.