本文包含 Python 使用者定義函式 (UDF) 範例。 它示範如何註冊 UDF、如何叫用 UDF,並提供 Spark SQL 中子運算式評估順序的注意事項。
在 Databricks Runtime 14.0 和更新版本中,您可以使用 Python 使用者定義數據表函式 (UDF) 來註冊傳回整個關聯而非純量值的函式。 請參閱 Python 使用者定義資料表函式 (UDF)。
備註
在 Databricks Runtime 12.2 LTS 和以下版本中,使用標準存取模式的 Unity 目錄計算不支援 Python UDF 和 Pandas UDF。 Databricks Runtime 13.3 LTS 和更新版本支援純量 Python UDF 和 Pandas UDF,適用於所有存取模式。
在 Databricks Runtime 13.3 LTS 和更新版本中,您可以使用 SQL 語法向 Unity 目錄註冊純量 Python UDF。 請參閱 Unity 目錄中 使用者定義函式 (UDF)。
將函式註冊為 UDF
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
您可以選擇性地設定 UDF 的傳回類型。 預設傳回型態為 StringType。
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
在 Spark SQL 中呼叫 UDF
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
使用 UDF 搭配 DataFrame
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
或者,您可以使用註釋語法來宣告相同的 UDF:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
與 UDF 的變體
變體的 PySpark 型態為 VariantType ,值為型別 VariantVal。 如需變體的相關資訊,請參閱 查詢變體資料。
from pyspark.sql.types import VariantType
# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
return VariantVal.parseJson(jsonString)
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
| {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
return {"v": VariantVal.parseJson(jsonString)}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
| {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
return [VariantVal.parseJson(jsonString)]
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
| [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
| toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+
計算順序與空值檢查
Spark SQL(包括 SQL 和 DataFrame 和數據集 API)不保證子表達式的評估順序。 特別是,運算子或函式的輸入不一定以左至右或任何其他固定順序進行評估。 例如,邏輯 AND 和 OR 運算式沒有由左至右的「短路」語法。
因此,依賴布林運算式評估的副作用或順序,以及 WHERE 和 HAVING 子句的順序是危險的做法,因為這類運算式和子句可以在查詢最佳化和規劃期間重新排序。 具體來說,如果 UDF 依賴 SQL 中的短路語意進行空值檢查,則無法保證在調用 UDF 之前會進行空值檢查。 例如,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
此 WHERE 子句不保證在過濾掉 Null 值之後會叫用 strlen UDF。
若要執行適當的 Null 檢查,建議您執行下列其中一項:
- 將 UDF 本身設計為具備 Null 感知能力,並在 UDF 本身內執行 Null 檢查。
- 使用
IF或CASE WHEN運算式執行 Null 檢查,並在條件分支中叫用 UDF
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
純量 Python UDF 中的服務認證
純量 Python UDF 可以使用 Unity 目錄服務認證,安全地存取外部雲端服務。 這適用於將雲端式令牌化、加密或秘密管理等作業直接整合到數據轉換中。
純量 Python UDF 的服務認證僅支援 SQL 倉儲和一般計算。
備註
純量 Python UDF 中的服務認證需要 Databricks Runtime 17.1 和更新版本。
若要建立服務認證,請參閱 建立服務認證。
備註
服務認證的 UDF 特定 API:
在 UDF 中,用於 databricks.service_credentials.getServiceCredentialsProvider() 存取服務認證。
這與在筆記本中使用的dbutils.credentials.getServiceCredentialsProvider()函式不同,該函式在 UDF 執行環境中不可用。
若要存取服務認證,請使用 databricks.service_credentials.getServiceCredentialsProvider() UDF邏輯中的公用程式,以適當的認證初始化雲端 SDK。 所有程式代碼都必須封裝在UDF主體中。
@udf
def use_service_credential():
from azure.mgmt.web import WebSiteManagementClient
# Assuming there is a service credential named 'testcred' set up in Unity Catalog
web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
# Use web_client to perform operations
服務憑證權限
UDF 的建立者必須具有 Unity 目錄服務認證的 ACCESS 許可權。
在 No-PE 範圍中執行的UDF,也稱為專用叢集,需要服務認證的MANAGE許可權。
預設認證
在純量 Python UDF 中使用時,Databricks 會自動使用來自計算環境變數的預設服務認證。 此行為可讓您安全地參考外部服務,而不需在 UDF 程式代碼中明確管理認證別名。 請參閱 指定計算資源的預設服務認證
默認認證支援僅適用於標準和專用存取模式叢集。 它無法在 DBSQL 中使用。
您必須安裝azure-identity套件才能使用DefaultAzureCredential提供者。 若要安裝套件,請參閱 筆記本範圍的 Python 連結庫 或 計算範圍連結庫。
@udf
def use_service_credential():
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient
# DefaultAzureCredential is automatically using the default service credential for the compute
web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)
# Use web_client to perform operations
獲取任務執行上下文
使用 TaskContext PySpark API 來獲取上下文信息,如使用者身份、叢集標籤、Spark 工作 ID 等。 請參閱在 UDF 中取得任務內容。
限制
下列限制適用於 PySpark UDF:
檔案存取限制: 在 Databricks Runtime 14.2 和以下版本上,共用叢集上的 PySpark UDF 無法存取 Git 資料夾、工作區檔案或 Unity 目錄磁碟區。
廣播變數: 標準存取模式叢集和無伺服器計算上的 PySpark UDF 不支持廣播變數。
服務認證: 服務認證僅適用於 Batch Unity 目錄 Python UDF 和純量 Python UDF。 標準 Unity 目錄 Python UDF 不支持它們。
服務認證:服務認證僅在使用無伺服器環境第 3 版或更新版本時,在無伺服器運算中可用。 請參閱 無伺服器環境版本。
- 無伺服器記憶體限制:無伺服器計算上的 PySpark UDF 每個 PySpark UDF 的記憶體限制為 1GB。 超過此限制會導致UDF_PYSPARK_USER_CODE_ERROR.MEMORY_LIMIT_SERVERLESS 類型的錯誤。
- 標準存取模式的記憶體限制:標準存取模式上的 PySpark UDF 會根據所選實例類型的可用記憶體,具有記憶體限制。 超過可用的記憶體會導致類型為 UDF_PYSPARK_USER_CODE_ERROR.MEMORY_LIMIT 的錯誤。