Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Poznámka
Tento článek popisuje Databricks Connect pro Databricks Runtime 13.3 a novější.
Databricks Connect pro Python podporuje uživatelem definované funkce (UDF). Když se spustí operace datového rámce, která obsahuje funkce definované uživatelem, budou tyto funkce serializovány nástrojem Databricks Connect a odesílány na server jako součást požadavku.
Informace o uživatelsky definovaných funkcích pro Databricks Connect pro Scala najdete v tématu Uživatelsky definované funkce v Databricks Connect pro Scala.
Poznámka
Vzhledem k tomu, že uživatelem definovaná funkce je serializována a deserializována, musí verze Pythonu klienta odpovídat verzi Pythonu ve výpočetních prostředcích Azure Databricks. Podporované verze najdete v matici podpory verzí .
Definovat funkci definovanou uživatelem
Pokud chcete vytvořit UDF v Databricks Connect pro Python, použijte jednu z následujících podporovaných funkcí:
- Uživatelem definované funkce PySpark
- Streamovací funkce PySpark
Například následující Python kód vytvoří jednoduchou uživatelsky definovanou funkci, která umocní hodnoty ve sloupci.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
Správa závislostí UDF
Důležité
Tato funkce je ve verzi Public Preview a vyžaduje Databricks Connect pro Python 16.4 nebo vyšší a cluster se spuštěným Modulem Databricks Runtime 16.4 nebo novějším. Pokud chcete tuto funkci použít, povolte ve svém pracovním prostoru zobrazení náhledu rozšířené Python UDFs v Katalogu Unity.
Databricks Connect podporuje zadávání závislostí Pythonu požadovaných pro uživatelské definované funkce. Tyto závislosti se instalují do výpočetních prostředků Databricks jako součást Python prostředí uživatelsky definované funkce (UDF).
Tato funkce umožňuje uživatelům určit závislosti, které uživatelsky definovaná funkce potřebuje nad rámec balíčků poskytovaných v základním prostředí. Dá se také použít k instalaci jiné verze balíčku, než je k dispozici v základním prostředí.
Závislosti je možné nainstalovat z následujících zdrojů:
- Balíčky PyPI
- Balíčky PyPI lze specifikovat podle PEP 508, například
dice,pyjokes<1nebosimplejson==3.19.*.
- Balíčky PyPI lze specifikovat podle PEP 508, například
- Balíčky uložené ve svazcích katalogu Unity
- Podporují se jak sestavená distribuce (
.whl) tak zdrojová distribuce (.tar.gz). - Balíčky svazků katalogu Unity lze zadat jako
dbfs:<path>, napříkladdbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whlnebodbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gz. - Uživateli musí být uděleno
READ_FILEoprávnění k souboru ve svazku re:[UC]. Udělení tohoto oprávnění všem uživatelům účtu automaticky aktivuje tuto funkci pro nové uživatele.
- Podporují se jak sestavená distribuce (
- Místní balíčky, složky a soubory Pythonu
- Místní sestavené distribuce (
.whl), zdrojové distribuce (.tar.gz), složky a soubory Pythonu lze zadat jakolocal:<path>, například:local:/path/to/my_private_dep-3.20.2-py3-none-any.whl,local:/path/to/my_private_dep-4.0.0.tar.gz,local:/path/to/my_folder,local:/path/to/my_file.py. - Podporují se absolutní i relativní cesty, například:
local:/path/to/my_file.pynebolocal:./path/to/my_file.py.
- Místní sestavené distribuce (
Pokud chcete do funkce definované uživatelem (UDF) zahrnout vlastní závislosti, zadejte je v prostředí pomocí withDependencies, a pak toto prostředí použijte k vytvoření relace Sparku. Závislosti se nainstalují na vaše výpočetní prostředky Databricks a budou dostupné ve všech UDF (uživatelsky definovaných funkcích), které používají tuto Spark session.
Následující kód deklaruje balíček dice PyPI jako závislost:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Nebo pokud chcete určit závislost kolečka ve svazku:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Chování v poznámkových blocích a úlohách Databricks
V poznámkových blocích a úlohách musí být závislosti uživatelsky definovaných funkcí nainstalované přímo v REPL. Databricks Connect ověří prostředí Pythonu REPL tím, že ověří, že jsou už nainstalované všechny zadané závislosti, a vyvolá výjimku, pokud nejsou nainstalované. Ověřování prostředí poznámkového bloku se provádí pro závislosti objemu PyPI a Unity Catalog, ale nikoli pro místní závislosti.
Omezení
- Podpora závislostí UDF pro
pyspark.sql.streaming.DataStreamWriter.foreachvyžaduje Databricks Connect pro Python 18.0 nebo novější a cluster se spuštěným Databricks Runtime 18.0 nebo novějším. - Podpora závislostí UDF pro
pyspark.sql.streaming.DataStreamWriter.foreachBatchvyžaduje Databricks Connect pro Python 18.0 nebo vyšší a cluster běžící na Databricks Runtime 18.0 nebo vyšší. Tato funkce není podporována v bezserverové verzi. - Podpora závislostí UDF pro místní balíčky, složky a soubory Pythonu vyžaduje Databricks Connect pro Python 18.1 nebo novější a cluster se spuštěným Modulem Databricks Runtime 18.1 nebo novějším.
- Závislosti uživatelem definovaných funkcí nejsou podporovány pro agregace UDF knihovny pandas nad okenními funkcemi.
- Svazky katalogu Unity a místní balíčky musí být zabaleny podle standardních specifikací balení Pythonu z PEP-427 nebo novější pro distribuce typu wheel a PEP-241 nebo novější pro tar zdrojové distribuce. Další informace o standardech balení Pythonu najdete v dokumentaci k PyPA.
Příklady
Následující příklad definuje závislosti PyPI a svazků v prostředí, vytvoří relaci s tímto prostředím a pak definuje a volá uživatelem definované funkce, které tyto závislosti používají:
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]
local_deps = [
# Example library from: https://pypi.org/project/simplejson/#files
"local:./test/simplejson-3.20.2-py3-none-any.whl",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
Automatická správa závislostí UDF
Důležité
Tato funkce je ve verzi Public Preview a vyžaduje Databricks Connect pro Python 18.1 nebo novější, Python 3.12 na místním počítači a cluster se spuštěným Modulem Databricks Runtime 18.1 nebo novějším. Pokud chcete tuto funkci použít, povolte ve svém pracovním prostoru zobrazení náhledu rozšířené Python UDFs v Katalogu Unity.
Rozhraní Databricks Connect withAutoDependencies() API umožňuje automatické zjišťování a nahrávání místních modulů a veřejně dostupných závislostí z PyPI používaných v importních příkazech ve vašich UDF. Odebere potřebu ručně zadat závislosti.
Následující kód umožňuje automatickou správu závislostí:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Metoda withAutoDependencies() přijímá následující parametry:
-
upload_local: Pokud je tato možnost nastavená naTrue, místní moduly importované ve vašich uživatelem definovaných funkcích se automaticky zjistí, zabalí a nahrají do sandboxu definovaného uživatelem. -
use_index: Pokud je tato možnost nastavená naTrue, veřejné závislosti PyPI používané ve vašich uživatelsky definovaných funkcích se automaticky rozpoznají a nainstalují do výpočetních prostředků Azure Databricks. Proces zjišťování používá nainstalované balíčky na místním počítači k určení verzí a zajištění konzistence mezi místním prostředím a prostředím vzdáleného spuštění.
Omezení
- Dynamické importy (například
importlib.import_module("foo")) nejsou podporovány. - Balíčky jmenných prostorů (například
azure.eventhubagoogle.cloud.aiplatform) nejsou podporovány. - Závislosti nainstalované pomocí odkazů s přímou adresou URL se nepodporují. To zahrnuje ty, které jsou instalované z místních wheel souborů.
- Závislosti nainstalované z indexů privátních balíčků se nepodporují. Balíčky nainstalované tímto způsobem nelze odlišit od balíčků nainstalovaných z veřejného PyPI.
- Zjišťování závislostí nefunguje v prostředí Pythonu. Podporují se jenom skripty Pythonu, prostředí IPython a poznámkové bloky Jupyter.
Příklady
Následující příklad ukazuje automatickou správu závislostí s místními moduly i balíčky PyPI. Tento příklad vyžaduje, abyste měli nainstalovány simplejson a dice (pomocí pip install simplejson dice).
Nejprve vytvořte místní pomocné moduly:
# my_helper.py
def double(x):
return 2 * x
# my_json.py
import simplejson
def loads(x):
return simplejson.loads(x)
def dumps(x):
return simplejson.dumps(x)
Pak v hlavním skriptu naimportujte uvedené moduly a použijte je ve funkcích UDF.
# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType
import my_json
from my_helper import double
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
return my_json.loads(my_json.dumps(double(x)))
@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
return x + y + (dc.roll("d6")[0] / 6)
df = spark.range(1, 10)
df = df.withColumns({
"doubled": double_and_json_parse(col("id")),
"summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()
Logování
Chcete-li vytvořit výstup zjištěných závislostí, nastavte proměnnou SPARK_CONNECT_LOG_LEVEL prostředí na info hodnotu nebo debug. Případně nakonfigurujte architekturu protokolování Pythonu:
import logging
logging.basicConfig(level=logging.INFO)
Relevantní protokoly jsou generovány modulem databricks.connect.auto_dependencies, například:
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0
Základní prostředí Pythonu
Funkce definované uživatelem se spouští na výpočetních jednotkách Databricks, nikoli na straně klienta. Základní běhové prostředí Pythonu, ve kterém se spouští uživatelsky definované funkce, závisí na výpočetních prostředcích Databricks.
Základní prostředí Pythonu je pro clustery prostředí Pythonu verze Databricks Runtime spuštěné v clusteru. Verze Pythonu a seznam balíčků v tomto základním prostředí najdete v částech Systémové prostředí a nainstalované knihovny Pythonu v poznámkách k verzi Databricks Runtime.
Základní prostředí Pythonu pro bezserverové výpočetní prostředky odpovídá verzi bezserverového prostředí podle následující tabulky. Verze Databricks Connect, které nejsou uvedené v této tabulce, zatím nepodporují bezserverovou podporu nebo nedosáhly konce podpory. Podívejte se na matici podpory verzí a verze Databricks Connect, které již nejsou podporovány.
| Verze Databricks Connect | Bezserverové prostředí UDF |
|---|---|
| 18.0, Python 3.12 | Verze 5 |
| 17.2 až 17.3, Python 3.12 | Verze 4 |
| 16.4.1 až pod 17, Python 3.12 | Verze 3 |
| Verze 15.4.10 až nižší než 16, Python 3.12 | Verze 3 |
| 15.4.10 na nižší než 16, Python 3.11 | Verze 2 |