這很重要
這項功能目前處於 公開預覽版。
Batch Unity Catalog Python UDF 能夠擴展 Unity Catalog UDF 的功能,允許您編寫 Python 代碼來批量處理數據,大幅提高效率,減少逐行 UDF 相關的開銷。 优化使Unity Catalog批量Python UDFs非常适合大规模数据处理。
需求
批次 Unity Catalog Python UDFs 需要 Databricks Runtime 版本 16.3 及以上。
建立批次 Unity Catalog 的 Python 使用者自定義函數 (UDF)
批次 Unity Catalog Python UDF 的建立方式類似於一般 Unity Catalog UDF 的建立方式,只需增加以下幾項:
-
PARAMETER STYLE PANDAS: 指定此UDF使用pandas迭代器批量處理數據。 -
HANDLER 'handler_function':這指定了用來處理批次的處理函數。
以下範例展示如何建立批次 Unity Catalog Python UDF:
%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
註冊 UDF 之後,您可以使用 SQL 或 Python 來呼叫它:
SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);
批次 UDF 處理器功能
批次處理的 Unity Catalog Python UDF 需要一個處理批次並產生結果的處理函數。 當您使用 HANDLER 索引鍵建立 UDF 時,您必須指定處理程式函式的名稱。
處理程序的功能如下所示:
- 接受一個迭代器參數,該參數可以迭代一個或多個
pandas.Series。 每個pandas.Series包含UDF的輸入參數。 - 逐一查看產生器並處理數據。
- 返回generator iterator。
Batch Unity 目錄 Python UDF 必須傳回與輸入相同的數據列數目。 處理器函數透過在每個批次中生成與輸入序列相同長度的 pandas.Series 來確保這一點。
安裝自定義相依性
您可以通過為外部庫設定自定義依賴項,擴展批次Unity目錄Python UDF的功能,使其超越Databricks執行環境。
請參閱 使用自訂相依性擴充 UDF。
批次 UDFs 可以接受單一或多個參數
單一參數:當處理函數使用單一輸入參數時,將接收一個迭代器,該迭代器對每個批次上的pandas.Series進行迭代。
%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for value_batch in batch_iter:
d = {"min": value_batch.min(), "max": value_batch.max()}
yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;
多個參數: 對於多個輸入參數,處理函式會接收一個迭代器,此迭代器用於遍歷多個pandas.Series。 數列中的值與輸入參數的順序相同。
%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for p1, p2 in batch_iter: # same order as arguments above
yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);
通過分離高成本操作以優化效能
你可以通過將這些計算量大的操作從處理程序中分離出來來優化它們。 這確保它們只執行一次,而不是在每次迭代處理數據批次時執行。
以下範例顯示如何確保高耗費的計算僅執行一次:
%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
# expensive computation...
return 1
expensive_value = compute_value()
def handler_func(batch_iter):
for batch in batch_iter:
yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL
隔離界限和安全性
備註
共用隔離環境需要 Databricks Runtime 17.1 和更新版本。 在舊版中,所有 Batch Unity 目錄 Python UDF 都會以嚴格的隔離模式執行。
具有相同擁有者的 Batch Unity 目錄 Python UDF 預設可以共用隔離環境。 這可藉由減少需要啟動的不同環境數目來改善效能並減少記憶體使用量。
嚴格隔離
若要確保 UDF 一律在自己的完全隔離環境中執行,請新增 STRICT ISOLATION 特性子句。
大部分的 UDF 不需要嚴格的隔離。 標準數據處理 UDF 受益於預設的共用隔離環境,並以較低的記憶體耗用量更快執行。
將 STRICT ISOLATION 特性子句新增至 UDF,以便:
- 使用
eval()、exec()或類似的函式將輸入作為代碼運行 - 將檔案寫入本機檔案系統
- 修改全域變數或系統狀態
- 存取或修改環境變數
下列範例顯示執行輸入為程式代碼且需要嚴格隔離的 UDF:
CREATE OR REPLACE TEMPORARY FUNCTION eval_string(input STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
STRICT ISOLATION
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for code_series in batch_iter:
def eval_func(code):
try:
return str(eval(code))
except Exception as e:
return f"Error: {e}"
yield code_series.apply(eval_func)
$$;
Batch Unity 目錄 Python UDF 中的服務認證
批量 Unity Catalog Python UDF 可以使用 Unity Catalog 服務憑證來存取外部雲端服務。 這在將安全標記器等雲端功能整合到數據處理工作流程中特別有用。
備註
服務認證的 UDF 特定 API:
在 UDF 中,用於 databricks.service_credentials.getServiceCredentialsProvider() 存取服務認證。
這與在筆記本中使用的dbutils.credentials.getServiceCredentialsProvider()函式不同,該函式在 UDF 執行環境中不可用。
若要建立服務認證,請參閱 建立服務認證。
指定您想要在 UDF 定義中的 子句中使用的 CREDENTIALS 服務認證:
CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
`credential-name` DEFAULT,
`complicated-credential-name` AS short_name,
`simple-cred`,
cred_no_quotes
)
AS $$
# Python code here
$$;
服務憑證權限
UDF 的創建者必須在 Unity Catalog 服務憑證上擁有 ACCESS 權限。 然而,對於 UDF 呼叫者而言,只需為他們授予 UDF 的 EXECUTE 權限即可。 特別是,UDF 呼叫者不需要訪問底層服務憑證,因為 UDF 是使用 UDF 創建者的憑證權限來執行的。
對於臨時函數,創建者始終是調用者。 在No-PE 範圍內運行的 UDF(使用者自定義函數),也稱為專用集群,則改用呼叫者的權限。
預設憑證和別名
您可以在 子句中包含 CREDENTIALS 多個認證,但只能將認證標示為 DEFAULT。 您可以使用AS關鍵字為非預設憑證設定別名。 每個憑證必須有唯一的別名。
修補過的雲端 SDK 會自動獲取預設憑證。 默認認證優先於計算 Spark 組態中指定的任何預設值,並保存在 Unity 目錄 UDF 定義中。
您必須安裝azure-identity套件才能使用DefaultAzureCredential提供者。 使用ENVIRONMENT子句來安裝外部函式庫。 如需了解更多關於安裝外部函式庫的資訊,請參閱使用自訂相依性擴展 UDF。
服務認證範例 - Azure Functions
下列範例會使用服務認證,從批次 Unity 目錄 Python UDF 呼叫 Azure 函式:
%sql
CREATE OR REPLACE FUNCTION main.test.call_azure_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
ENVIRONMENT (
dependencies = '["azure-identity", "requests"]', environment_version = 'None'
)
AS $$
import json
import pandas as pd
import requests
from azure.identity import DefaultAzureCredential
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# DefaultAzureCredential automatically uses the DEFAULT credential
credential = DefaultAzureCredential()
token = credential.get_token("https://management.azure.com/.default")
# Azure Function URL
function_url = "https://your-function-app.azurewebsites.net/api/HashValuesFunction"
# Propagate TaskContext information:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = {
"values": vals.to_list(),
"is_debug": bool(is_debug[0]),
"context": user_ctx
}
headers = {
"Authorization": f"Bearer {token.token}",
"Content-Type": "application/json"
}
response = requests.post(function_url, json=payload, headers=headers)
if response.status_code != 200:
raise Exception(f"Function call failed: {response.text}")
response_data = response.json()
if "errorMessage" in response_data:
raise Exception(str(response_data))
yield pd.Series(response_data["values"])
$$;
在註冊後呼叫 UDF。
SELECT main.test.call_azure_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
獲取任務執行上下文
使用 TaskContext PySpark API 來獲取上下文信息,如使用者身份、叢集標籤、Spark 工作 ID 等。 請參閱在 UDF 中取得任務內容。
設定 DETERMINISTIC 以確保您的函數產生一致的結果
如果函數定義對相同的輸入產生相同的輸出,則在您的函數定義中新增 DETERMINISTIC。 這允許查詢優化以提高效能。
根據預設,除非明確宣告,否則會假設批次 Unity Catalog 的 Python UDTF 為非確定性。 非確定性函數的範例包括:產生隨機值、存取目前時間或日期,或進行外部 API 呼叫。
請參閱 CREATE FUNCTION (SQL 和 Python)
局限性
- Python 函式必須獨立處理
NULL值,而且所有類型對應都必須遵循 Azure Databricks SQL 語言對應。 - 批次 Unity Catalog Python UDFs 在一個安全、隔離的環境中運行,無法存取共享文件系統或內部服務。
- 在一個階段中,對多次 UDF(使用者自定義函數)的調用是串行化的,中間結果會被實體化,並可能溢出至磁碟。
- 服務認證僅適用於 Batch Unity Catalog Python 使用者定義函數 (UDF) 和純量 Python 使用者定義函數 (UDF)。 標準 Unity 目錄 Python UDF 不支持它們。
- 在專用集群和臨時功能中,函數調用者必須擁有
ACCESS服務憑證的許可權。 請參閱 授權使用服務憑證以訪問外部雲端服務。 - 啟用公開預覽功能在工作區的 [預覽] 頁面中啟用無伺服器 SQL 倉儲中 UDF 的網路,以對無伺服器 SQL 倉儲計算上的外部服務進行批次 Unity 目錄 Python UDF 呼叫。
- 若要在無伺服器筆記本或作業計算上呼叫 Batch Unity 目錄 Python UDF,您必須設定 無伺服器輸出控制項