本文包含 Scala 使用者定義函式 (UDF) 範例。 本文示範如何註冊 UDF、如何叫用 UDF,以及有關 Spark SQL 中子運算式的計算順序的注意事項。 如需詳細資訊,請參閱外部使用者定義純量函式 (UDF)。
注意
啟用 Unity Catalog 並使用標準存取模式的計算資源上的 Scala UDF(以前稱為共用存取模式)需要 Databricks Runtime 14.2 及以上版本。
將函式註冊為 UDF
val squared = (s: Long) => {
s * s
}
spark.udf.register("square", squared)
在 Spark SQL 中呼叫 UDF
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test
使用 UDF 與 DataFrames 搭配
import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))
評估順序和 Null 檢查
Spark SQL(包括 SQL、DataFrame 和資料集 API)不保證子運算式的評估順序。 特別是,運算子或函式的輸入不一定以左至右或任何其他固定順序進行評估。 例如,邏輯 AND
和 OR
運算式沒有由左至右的「短路」語意。
因此,依賴布林運算式評估的副作用或順序,以及 WHERE
和 HAVING
子句的順序是危險的做法,因為這類運算式和子句可以在查詢最佳化和規劃期間重新排序。 具體來說,如果 UDF 依賴 SQL 中的短路語意進行空值檢查,則無法保證在調用 UDF 之前會進行空值檢查。 例如,
spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee
此 WHERE
子句不保證在篩選出空值後會叫用 strlen
UDF。
若要執行適當的 Null 檢查,建議您執行下列其中一項:
- 使 UDF 本身具備 Null 感知能力,並在內部進行 Null 檢查。
- 使用
IF
或CASE WHEN
運算式執行 Null 檢查,並在條件分支中叫用 UDF
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
型別化資料集 API
注意
在 Databricks Runtime 15.4 和更新版本具有標準存取模式的 Unity 目錄啟用叢集上支援此功能。
具類型的資料集 API 可讓您使用使用者定義的函數,在產生的資料集上執行轉換,例如地圖、篩選和匯總。
例如,下列 Scala 應用程式會使用 map()
API 將結果資料行中的數字修改為前置字串。
spark.range(3).map(f => s"row-$f").show()
雖然此範例使用 map()
API,但這也適用於其他具類型的資料集 API,例如 filter()
、mapPartitions()
、foreach()
、foreachPartition()
、reduce()
和flatMap()
。
Scala UDF 功能和 Databricks 執行環境兼容性
下列 Scala 功能在標準(共用)存取模式中使用啟用 Unity Catalog 的叢集時,需要最低 Databricks 執行環境版本。
特徵 / 功能 | Minimimum Databricks Runtime 版本 |
---|---|
純量 UDF | Databricks Runtime 14.2(Databricks 執行環境 14.2) |
Dataset.map 、Dataset.mapPartitions 、Dataset.filter 、Dataset.reduce 、Dataset.flatMap |
Databricks 執行環境 15.4 |
KeyValueGroupedDataset.flatMapGroups 、KeyValueGroupedDataset.mapGroups |
Databricks 執行環境 15.4 |
(串流) foreachWriter Sink |
Databricks 執行環境 15.4 |
(串流) foreachBatch |
Databricks Runtime 16.1 |
(串流) KeyValueGroupedDataset.flatMapGroupsWithState |
Databricks Runtime 16.2 |