注意
本文涵蓋適用於 Databricks Runtime 13.3 和更新版本的 Databricks Connect。
適用於 Python 的 Databricks Connect 支援使用者定義的函式 (UDF)。 執行包含 UDF 的 DataFrame 作業時,UDF 會由 Databricks Connect 串行化,並在要求中傳送至伺服器。
如需適用於 Scala 之 Databricks Connect 的 UDF 相關信息,請參閱 Databricks Connect for Scala 中的使用者定義函式。
注意
由於使用者定義的函式已串行化和還原串行化,因此用戶端的 Python 版本必須符合 Azure Databricks 計算上的 Python 版本。 如需支援的版本,請參閱 版本支援矩陣。
定義UDF
若要在 Databricks Connect for Python 中建立 UDF,請使用下列其中一個支援的函式:
- PySpark 使用者定義函式
- PySpark 串流函式
例如,下列 Python 設定了一個簡單的 UDF,用於將欄中的值進行平方運算。
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
管理 UDF 依賴關係
這很重要
這項功能處於 公開預覽 狀態,需要適用於 Python 16.4 或更新版本且執行 Databricks Runtime 16.4 或更新版本之叢集的 Databricks Connect。 若要使用此功能,請在工作區啟用預覽Unity Catalog 中增強的 Python UDF。
Databricks Connect 支援指定 UDF 所需的 Python 相依性。 這些相依性會安裝在 Databricks 計算上,做為 UDF Python 環境的一部分。
此功能可讓使用者指定 UDF 除了基底環境中提供的套件以外,還需要的相依性。 它也可以用來安裝與 基底環境中提供的不同套件版本。
您可以從下列來源安裝相依性:
- PyPI 套件
- 您可以根據 PEP 508 指定 PyPI 套件, 例如 、
dicepyjokes<1或simplejson==3.19.*。
- 您可以根據 PEP 508 指定 PyPI 套件, 例如 、
- 儲存在 Unity 目錄卷中的套件
- 支援建置版(
.whl)與原始碼發行版(.tar.gz)。 - Unity Catalog 卷的套件可以指定為
dbfs:<path>,例如dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl或dbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gz。 - 用戶必須獲
READ_FILE授與 re:[UC] 磁碟區中檔案的許可權。 將此許可權授與所有帳戶使用者,會自動為新使用者啟用此許可權。
- 支援建置版(
- 本地套件、資料夾與 Python 檔案
- 本地建置發行版(
.whl)、原始碼發行版(.tar.gz)、資料夾及 Python 檔案可指定為local:<path>,例如:local:/path/to/my_private_dep-3.20.2-py3-none-any.whl、、local:/path/to/my_private_dep-4.0.0.tar.gzlocal:/path/to/my_folderlocal:/path/to/my_file.py、 。 - 支持絕對路徑與相對路徑,例如:
local:/path/to/my_file.py或local:./path/to/my_file.py。
- 本地建置發行版(
若要在 UDF 中包含自定義相依性,請使用 withDependencies在環境中指定它們,然後使用該環境來建立 Spark 會話。 相依性會安裝在您的 Databricks 計算上,而且可在使用此 Spark 會話的所有 UDF 中使用。
下列程式代碼會將 PyPI 套件 dice 宣告為相依性:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
或者,若要指定磁碟區中轉輪的相依性:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Databricks 筆記本與作業中的行為
在筆記本和作業中,UDF 相依性必須直接安裝在 REPL 中。 Databricks Connect 會驗證 REPL Python 環境,方法是確認已安裝所有指定的相依性,並在未安裝任何相依性時擲回例外狀況。 筆記本環境針對 PyPI 和 Unity Catalog 的卷相依性執行檢查,但不包括本地相依性。
局限性
- UDF
pyspark.sql.streaming.DataStreamWriter.foreach相依支援需要 Databricks Connect for Python 18.0 或以上版本,以及執行 Databricks Runtime 18.0 或以上版本的叢集。 - UDF
pyspark.sql.streaming.DataStreamWriter.foreachBatch相依支援需要 18.0 版或以上的 Databricks Connect for Python,以及執行 18.0 版或以上的 Databricks Runtime 叢集。 此功能在無伺服器模式中不支援。 - UDF 相依性支援本地套件、資料夾和 Python 檔案,這需要使用 Databricks Connect 並且需符合 Python 18.1 或以上版本,以及運行 Databricks Runtime 18.1 或以上版本的叢集。
- UDF 相依性不支援於 pandas 聚合 UDF 在視窗函數上的應用。
- Unity Catalog 仓库套件和本地套件必须根据标准的 Python 打包规范进行打包,具体来说,轮式分发版需要符合PEP-427或更新版本的要求,而 tar 源代码分发版则需要符合PEP-241或更新版本的要求。 如需 Python 封裝標準的詳細資訊,請參閱 PyPA 檔。
範例
下列範例定義了環境中的 PyPI 和磁碟區的依賴項,創建了一個基於該環境的會話,然後定義並調用使用這些依賴項的使用者定義函數(UDF):
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]
local_deps = [
# Example library from: https://pypi.org/project/simplejson/#files
"local:./test/simplejson-3.20.2-py3-none-any.whl",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
UDF 相依性自動管理
這很重要
此功能目前為 公開預覽 版,需 Databricks Connect 支援 Python 18.1 或以上版本,本地機器需支援 Python 3.12,並需叢集運行 Databricks Runtime 18.1 或以上版本。 若要使用此功能,請在工作區啟用預覽Unity Catalog 中增強的 Python UDF。
Databricks Connect withAutoDependencies() API 允許自動發現並上傳本地模組及用於 UDF 匯入語句中的公開 PyPI 相依關係。 這樣就不用手動指定相依性了。
以下程式碼可實現自動相依管理:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
此 withAutoDependencies() 方法接受以下參數:
-
upload_local: 當設定為True時,匯入 UDF 中的本地模組會自動被發現、打包並上傳到 UDF 沙盒。 -
use_index: 當設定為True時,用於 UDF 的公開 PyPI 相依關係會自動被發現並安裝在 Azure Databricks 運算中。 發現過程會利用你本地機器上已安裝的套件來決定版本,確保本地環境與遠端執行環境之間的一致性。
局限性
- 動態匯入(例如)
importlib.import_module("foo")不支援。 - 不支援命名空間套件(例如
azure.eventhub和google.cloud.aiplatform)。 - 不支援使用直接 URL 參考安裝的相依關係。 這包括從本地 Wheel 檔案安裝的檔案。
- 不支援從私人套件索引安裝的依賴項。 以這種方式安裝的套件無法與公開 PyPI 安裝的套件區分。
- 相依性發現在 Python shell 裡是行不通的。 僅支援 Python 腳本、IPython shell 和 Jupyter Notebook。
範例
以下範例展示了本地模組與 PyPI 套件的自動相依管理。 此範例要求你已安裝 simplejson 且 dice (使用 pip install simplejson dice)。
首先,建立本地輔助模組:
# my_helper.py
def double(x):
return 2 * x
# my_json.py
import simplejson
def loads(x):
return simplejson.loads(x)
def dumps(x):
return simplejson.dumps(x)
接著,在你的主腳本中匯入這些模組,並在 UDF 中使用:
# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType
import my_json
from my_helper import double
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
return my_json.loads(my_json.dumps(double(x)))
@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
return x + y + (dc.roll("d6")[0] / 6)
df = spark.range(1, 10)
df = df.withColumns({
"doubled": double_and_json_parse(col("id")),
"summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()
森林伐木業
要輸出發現的依賴,請將環境變數設 SPARK_CONNECT_LOG_LEVEL 為 info 或 debug。 或者,設定 Python 日誌框架:
import logging
logging.basicConfig(level=logging.INFO)
相關日誌由 databricks.connect.auto_dependencies 模組輸出,例如:
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0
Python 基底環境
UDF 會在 Databricks 計算上執行,而不是在用戶端上執行。 執行 UDF 的基底 Python 環境取決於 Databricks 計算。
針對叢集,基底 Python 環境是叢集上執行的 Databricks Runtime 版本的 Python 環境。 此基底環境中的 Python 版本和套件清單位於 Databricks Runtime 版本資訊的[系統環境] 和 [已安裝的 Python 連結庫] 區段底下。
針對無伺服器計算,基底 Python 環境會根據下表對應至 無伺服器環境版本 。 未列在本表中的 Databricks Connect 版本,要麼尚未支援無伺服器,要麼已結束支援。 請參閱 版本支援矩陣 及 Databricks Connect 的支援終止版本。
| Databricks Connect 版本 | UDF 無伺服器環境 |
|---|---|
| 18.0,Python 3.12 | 版本 5 |
| 17.2 到 17.3,Python 3.12 | 版本 4 |
| 16.4.1 到 17 以下,Python 3.12 | 第3版 |
| 15.4.10 至小於 16,Python 3.12 | 第3版 |
| 15.4.10 到以下 16,Python 3.11 | 第 2 版 |