共用方式為


使用者定義的純量函式 - Scala

本文包含 Scala 使用者定義函式 (UDF) 範例。 本文示範如何註冊 UDF、如何叫用 UDF,以及有關 Spark SQL 中子運算式的計算順序的注意事項。 如需詳細資訊,請參閱外部使用者定義純量函式 (UDF)

注意

啟用 Unity Catalog 並使用標準存取模式的計算資源上的 Scala UDF(以前稱為共用存取模式)需要 Databricks Runtime 14.2 及以上版本。

將函式註冊為 UDF

val squared = (s: Long) => {
  s * s
}
spark.udf.register("square", squared)

在 Spark SQL 中呼叫 UDF

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test

使用 UDF 與 DataFrames 搭配

import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))

評估順序和 Null 檢查

Spark SQL(包括 SQL、DataFrame 和資料集 API)不保證子運算式的評估順序。 特別是,運算子或函式的輸入不一定以左至右或任何其他固定順序進行評估。 例如,邏輯 ANDOR 運算式沒有由左至右的「短路」語意。

因此,依賴布林運算式評估的副作用或順序,以及 WHEREHAVING 子句的順序是危險的做法,因為這類運算式和子句可以在查詢最佳化和規劃期間重新排序。 具體來說,如果 UDF 依賴 SQL 中的短路語意進行空值檢查,則無法保證在調用 UDF 之前會進行空值檢查。 例如,

spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee

WHERE 子句不保證在篩選出空值後會叫用 strlen UDF。

若要執行適當的 Null 檢查,建議您執行下列其中一項:

  • 使 UDF 本身具備 Null 感知能力,並在內部進行 Null 檢查。
  • 使用 IFCASE WHEN 運算式執行 Null 檢查,並在條件分支中叫用 UDF
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

型別化資料集 API

注意

在 Databricks Runtime 15.4 和更新版本具有標準存取模式的 Unity 目錄啟用叢集上支援此功能。

具類型的資料集 API 可讓您使用使用者定義的函數,在產生的資料集上執行轉換,例如地圖、篩選和匯總。

例如,下列 Scala 應用程式會使用 map() API 將結果資料行中的數字修改為前置字串。

spark.range(3).map(f => s"row-$f").show()

雖然此範例使用 map() API,但這也適用於其他具類型的資料集 API,例如 filter()mapPartitions()foreach()foreachPartition()reduce()flatMap()

Scala UDF 功能和 Databricks 執行環境兼容性

下列 Scala 功能在標準(共用)存取模式中使用啟用 Unity Catalog 的叢集時,需要最低 Databricks 執行環境版本。

特徵 / 功能 Minimimum Databricks Runtime 版本
純量 UDF Databricks Runtime 14.2(Databricks 執行環境 14.2)
Dataset.mapDataset.mapPartitionsDataset.filterDataset.reduceDataset.flatMap Databricks 執行環境 15.4
KeyValueGroupedDataset.flatMapGroupsKeyValueGroupedDataset.mapGroups Databricks 執行環境 15.4
(串流) foreachWriter Sink Databricks 執行環境 15.4
(串流) foreachBatch Databricks Runtime 16.1
(串流) KeyValueGroupedDataset.flatMapGroupsWithState Databricks Runtime 16.2