Unity Catalog 中的 Batch Python 用户定义函数 (UDF)

重要

此功能目前以公共预览版提供。

通过允许您编写 Python 代码来批量处理数据,批量 Unity Catalog Python UDF 扩展了 Unity Catalog UDF 的功能。通过减少逐行 UDF 所带来的开销,这显著提高了效率。 这些优化使 Unity 目录批处理 Python UDF 非常适合大规模数据处理。

要求

Batch Unity Catalog Python UDF 需要 Databricks Runtime 版本 16.3 及更高版本。

创建 Batch Unity Catalog Python UDF

创建 Batch Unity 目录 Python UDF 类似于创建常规 Unity 目录 UDF,并添加以下内容:

  • PARAMETER STYLE PANDAS:这指定 UDF 使用 pandas 迭代器批量处理数据。
  • HANDLER 'handler_function':这指定调用以处理批处理的处理程序函数。

以下示例演示如何创建 Batch Unity 目录 Python UDF:

%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for weight_series, height_series in batch_iter:
    yield weight_series / (height_series ** 2)
$$;

注册 UDF 后,可以使用 SQL 或 Python 调用它:

SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
  SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
  SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);

批量 UDF 处理器函数

Batch Unity Catalog Python UDF 需要处理批并生成结果的处理程序函数。 使用 HANDLER 键创建 UDF 时,必须指定处理程序函数的名称。

处理程序函数执行以下任务:

  1. 接受循环访问一个或多个 pandas.Series 的迭代器参数。 每个 pandas.Series 参数都包含 UDF 的输入参数。
  2. 循环访问生成器并处理数据。
  3. 返回 生成器迭代器

批量 Unity Catalog Python UDF 返回的行数必须与输入相同。 处理程序函数通过为每个批生成一个与输入序列长度相同的 pandas.Series 来确保这一点。

安装自定义依赖项

可以通过定义外部库的自定义依赖项,将 Batch Unity 目录 Python UDF 的功能扩展到 Databricks Runtime 环境之外。

请参阅 使用自定义依赖项扩展 UDF

Batch UDF 可以接受单个或多个参数

单个参数:当处理程序函数使用单个输入参数时,它会收到循环访问每个批的 的迭代器pandas.Series

%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
  for value_batch in batch_iter:
    d = {"min": value_batch.min(), "max": value_batch.max()}
    yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;

多个参数: 对于多个输入参数,处理程序函数接收一个迭代器,用于迭代pandas.Series中的多个元素。 序列中的值与输入参数的顺序相同。

%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for p1, p2 in batch_iter: # same order as arguments above
    yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);

通过区分贵重的操作来优化性能

可以通过将这些计算成本高昂的操作与处理函数分离来进行优化。 这可确保它们仅执行一次,而不是在每次对批处理数据进行迭代期间执行。

以下示例演示如何确保仅执行一次昂贵的计算:

%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
  # expensive computation...
  return 1

expensive_value = compute_value()
def handler_func(batch_iter):
  for batch in batch_iter:
    yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL

Batch Unity Catalog Python UDF 中的服务凭据

Batch Unity 目录 Python UDF 可以使用 Unity 目录服务凭据访问外部云服务。 这对于将安全令牌化器等云函数集成到数据处理工作流中特别有用。

若要创建服务凭据,请参阅 “创建服务凭据”。

指定要在 UDF 定义中的 CREDENTIALS 子句中使用的服务凭据:

CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
  `credential-name` DEFAULT,
  `complicated-credential-name` AS short_name,
  `simple-cred`,
  cred_no_quotes
)
AS $$
# Python code here
$$;

服务凭据权限

UDF 创建者必须具有 ACCESS 对 Unity Catalog 服务凭据的权限。 但是,对于 UDF 调用方,只需授予他们对 UDF 的 EXECUTE 权限就足够了。 具体而言,UDF 调用方不需要访问基础服务凭据,因为 UDF 使用 UDF 创建者的凭据权限执行。

对于临时函数,创建者始终是调用者。 在 No-PE 范围内运行的 UDF(也称为专用群集)改用调用方的权限。

默认凭据和别名

可以在子句中包含 CREDENTIALS 多个凭据,但只能将一个凭据标记为 DEFAULT。 可以使用关键字对非默认凭据 AS 进行别名。 每个凭证必须具有唯一的别名。

已打补丁的云 SDK 会自动获取默认凭据。 默认凭据优先于计算的 Spark 配置中指定的任何默认值,并保留在 Unity 目录 UDF 定义中。

必须安装包 azure-identity 才能使用 DefaultAzureCredential 提供程序。 使用 ENVIRONMENT 子句安装外部库。 若要详细了解如何安装外部库,请参阅 使用自定义依赖项扩展 UDF

%sql
CREATE OR REPLACE TEMPORARY FUNCTION call_lambda_func(data STRING) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
ENVIRONMENT (
  dependencies = '["azure-identity", "azure-mgmt-web"]', environment_version = 'None'
)
AS $$
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient

def batchhandler(it):
  # DefaultAzureCredential is automatically using 'batch-udf-service-creds-example-cred'
  web_client = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)

  for vals in data:
    yield vals
$$

服务凭据示例 - AWS Lambda 函数

以下示例使用服务凭据从 Batch Unity 目录 Python UDF 调用 AWS Lambda 函数:

%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext


def batchhandler(it):
  # Automatically picks up DEFAULT credential:
  session = boto3.Session()

  client = session.client("lambda", region_name="us-west-2")

  # Propagate TaskContext information to lambda context:
  user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

  for vals, is_debug in it:
    payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})

    res = client.invoke(
      FunctionName="HashValuesFunction",
      InvocationType="RequestResponse",
      ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
        "utf-8"
      ),
      Payload=payload,
    )

    response_payload = json.loads(res["Payload"].read().decode("utf-8"))
    if "errorMessage" in response_payload:
      raise Exception(str(response_payload))

    yield pd.Series(response_payload["values"])
$$;

注册 UDF 后调用它:

SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)

获取任务执行上下文

使用 TaskContext PySpark API 获取上下文信息,例如用户的标识、群集标记、spark 作业 ID 等。 请参阅 UDF 中的“获取任务上下文”。

局限性

  • Python 函数必须独立处理 NULL 值,并且所有类型映射都必须遵循 Azure Databricks SQL 语言映射。
  • Batch Unity 目录 Python UDF 在安全隔离的环境中运行,并且无权访问共享文件系统或内部服务。
  • 在一个阶段中,多个 UDF 调用被序列化,中间结果会被保存并可能溢出到磁盘。
  • 服务凭据仅在 Batch Unity 目录 Python UDF 中可用。 标准 Unity 目录 Python UDF 或 PySpark UDF 不支持它们。
  • 在专用群集和临时函数上,函数调用方必须具有 ACCESS 服务凭据的权限。 请参阅 授予使用服务凭据访问外部云服务的权限
  • 启用公共预览功能 :在无服务器 SQL 仓库中为 UDF 启用网络 ,以便对无服务器 SQL 仓库计算上的外部服务进行 Batch Unity 目录 Python UDF 调用。
  • 若要在无服务器笔记本或作业计算上进行 Batch Unity Catalog Python UDF 调用,必须配置 无服务器出口控制