用户定义的标量函数 - Scala

本文包含 Scala 用户定义的函数 (UDF) 示例。 其中介绍了如何注册 UDF、如何调用 UDF 以及有关 Spark SQL 中子表达式计算顺序的注意事项。 有关详细信息,请参阅用户定义的标量函数 (UDF)

备注

具有共享访问模式的已启用 Unity Catalog 的计算资源上的 Scala UDF 需要 Databricks Runtime 14.2 及更高版本。

将函数注册为 UDF

val squared = (s: Long) => {
  s * s
}
spark.udf.register("square", squared)

在 Spark SQL 中调用 UDF

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test

将 UDF 与数据帧配合使用

import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))

计算顺序和 NULL 检查

Spark SQL(包括 SQL、数据帧和数据集 API)不保证子表达式的计算顺序。 具体而言,运算符或函数的输入不一定按从左到右或任何其他固定顺序进行计算。 例如,逻辑 ANDOR 表达式没有从左到右的“短路”语义。

因此,依赖于布尔表达式计算的副作用或顺序以及 WHEREHAVING 子句的顺序是危险的,因为在查询优化和规划过程中,这些表达式和子句可能重新排序。 具体而言,如果 UDF 依赖于 SQL 中的短路语义进行 NULL 检查,则不能保证在调用 UDF 之前执行 NULL 检查。 例如,

spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee

这个 WHERE 子句并不保证在筛选掉 NULL 后调用 strlen UDF。

若要执行正确的 NULL 检查,建议执行以下操作之一:

  • 使 UDF 自身能够识别 NULL,在 UDF 自身内部进行 NULL 检查
  • 使用 IFCASE WHEN 表达式来执行 NULL 检查并在条件分支中调用 UDF
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

类型化数据集 API

备注

Databricks Runtime 15.4 及更高版本中具有共享访问模式的已启用 Unity Catalog 的群集支持此功能。

使用类型化数据集 API 可以通过用户定义的函数对生成的数据集运行映射、筛选和聚合等转换。

例如,以下 Scala 应用程序使用 map() API 将结果列中的数字修改为前缀字符串。

spark.range(3).map(f => s"row-$f").show()

虽然此示例使用 map() API,但这也适用于其他类型化数据集 API,例如 filter()mapPartitions()foreach()foreachPartition()reduce()flatMap()