什么是用户定义函数 (UDF)?
用户定义函数(UDF)允许重复使用和共享在 Azure Databricks 上扩展内置功能的代码。 使用 UDF 执行特定任务,例如复杂计算、转换或自定义数据操作。
注意
在具有共享访问模式的群集上,Databricks Runtime 13.3 LTS 及更高版本支持 Python 标量 UDF,而 Databricks Runtime 14.2 及更高版本中支持 Scala UDF。
可以使用 Databricks Runtime 13.3 LTS 及更高版本中的 SQL 语法在 Unity 目录中注册 Python 标量 UDF。 请参阅 Unity Catalog 中的用户定义函数 (UDF)。
在什么情况下使用 UDF?
将 UDF 用于难以通过内置 Apache Spark 函数表达的逻辑。 内置的 Apache Spark 函数针对分布式处理进行了优化,通常可大规模提供更好的性能。 有关详细信息,请参阅函数。
Databricks 建议使用 UDF 进行临时查询、手动数据清理、探索性数据分析,以及针对中小型数据集的操作。 UDF 的常见用例包括数据加密和解密、哈希处理、JSON 分析和验证。
使用 Apache Spark 方法处理非常大的数据集以及定期或持续运行的任何工作负荷,包括 ETL 作业和流式处理操作。
已注册和会话范围的 UDF
使用 SQL 创建的 UDF 在 Unity 目录中注册并具有关联的权限,而在笔记本中创建的 UDF 是基于会话的,并且范围限定为当前的 SparkSession。
可以使用 Azure Databricks 支持的任何语言定义和访问基于会话的 UDF。 UDF 可以是标量或非标量。
注意
目前,只有在 Unity 目录中注册的 SQL 和 Python 标量 UDF 在 DBSQL 中可用。
标量 UDF
标量 UDF 对单个行进行操作,并为每个行返回单个值。 以下示例使用标量 UDF 计算列中每个名称 name
的长度,并在新列中 name_length
添加值:
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
若要使用 PySpark 在 Databricks 笔记本中实现此目的:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
有关详细信息,请参阅 Unity 目录中 的用户定义函数(UDF)和 用户定义的标量函数 - Python。
用户定义的聚合函数 (UDAF)
用户定义的聚合函数(UDAF)对多个行进行操作,并返回单个聚合结果。 在以下示例中,定义了聚合分数的 UDAF。
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
请参阅 pandas 用户定义函数 ,了解 Python 和 用户定义的聚合函数 - Scala。
Python 用户定义表函数 (UDTF)
重要
此功能目前以公共预览版提供。
注意
Databricks Runtime 14.3 LTS 及更高版本中提供了 Python UDF。
Python 用户定义的表函数(UDDF)可以为每个输入行返回多个行和列。 在以下示例中,分数列中的每一个值都对应于类别列表。 定义 UDTF 以将逗号分隔列表拆分为多个行。 请参阅 Python 用户定义的表函数(UDF)
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="score: int, categories: string, name: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: float, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
# Apply the UDTF
result_df = df.select(ScoreCategoriesUDTF(df.score, df.categories, df.name))
display(result_df)
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
性能注意事项
- 内置函数 和 SQL UDF 是可用的最有效的选项。
- Scala UDF 在 Java 虚拟机(JVM)中执行时通常速度更快,并避免将数据移入和移出 JVM 的开销。
- Python UDF 和 Pandas UDF 往往比 Scala UDF 慢,因为它们需要序列化数据并将其移出 JVM 到 Python 解释器。 Pandas UDF 的速度比 Python UDF 快 100 倍,因为它们使用 Apache Arrow 来降低序列化成本。