重要
這項功能在 Databricks Runtime 14.3 LTS 和更高版本中提供公開預覽。
使用者定義的數據表函式 (UDTF) 可讓您註冊傳回數據表而非純量值的函式。 不同於從每個呼叫傳回單一結果值的純量函式,每個 UDTF 都會在 SQL 語句的 FROM 子句中叫用,並傳回整個數據表做為輸出。
每個 UDTF 呼叫都可以接受零個或多個自變數。 這些自變數可以是代表整個輸入數據表的純量表達式或數據表自變數。
UDTF 可以透過兩種方式註冊:
- Unity 目錄:將 UDTF 註冊為 Unity 目錄中的受控管物件。 請參閱 Unity 目錄中的 Python 使用者定義資料表函式 (UDTF)。
- 工作階段範圍:註冊到本機
SparkSession,隔離到目前的筆記本或工作。
小提示
Databricks 建議在 Unity 目錄中註冊 UDTF,以利用集中式治理,讓使用者和小組之間更安全地共用和重複使用函式。
基本 UDTF 語法
Apache Spark 將 Python UDTF 實作為具有強制性 eval 方法的 Python 類別,該方法使用 yield 來發出輸出數據列。
若要使用類別作為 UDTF,您必須匯入 PySpark udtf 函式。 Databricks 建議使用這個函式作為裝飾器,並使用 returnType 選項明確指定欄位名稱和類型(除非類別定義了一個 analyze 方法,如稍後章節所述)。
下列 UDTF 會使用兩個整數自變數的固定清單來建立資料表:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
註冊 UDTF
若要註冊工作階段範圍的 UDTF 以用於 SQL 查詢,請使用 spark.udtf.register()。 提供 SQL 函式和 Python UDTF 類別的名稱。
spark.udtf.register("get_sum_diff", GetSumDiff)
呼叫已註冊的 UDTF
註冊之後,您可以使用 %sql magic 命令或 spark.sql() 函式,在 SQL 中使用 UDTF:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);").show()
%sql
SELECT * FROM get_sum_diff(1,2);
將工作階段範圍的 UDTF 升級至 Unity 目錄
您可以將工作階段範圍的 UDTF 升級至 Unity 目錄,以利用集中式治理,並更輕鬆地在使用者和小組之間安全地共用和重複使用函式。
若要將工作階段範圍的 UDTF 升級至 Unity 目錄,請搭配陳述 CREATE OR REPLACE FUNCTION 式使用 SQL DDL。 下列範例示範如何 GetSumDiff 將 UDTF 從工作階段範圍函式轉換成 Unity 目錄函式:
CREATE OR REPLACE FUNCTION get_sum_diff(x INT, y INT)
RETURNS TABLE (sum INT, diff INT)
LANGUAGE PYTHON
HANDLER 'GetSumDiff'
AS $$
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
$$;
SELECT * FROM get_sum_diff(10, 3);
+-----+------+
| sum | diff |
+-----+------+
| 13 | 7 |
+-----+------+
如需 Unity 目錄 UDTF 的詳細資訊,請參閱 Unity 目錄中的 Python 使用者定義資料表函式 (UDTF)。
使用 Apache Arrow
如果您的 UDTF 收到少量的數據做為輸入,但會輸出大型數據表,Databricks 建議使用 Apache Arrow。 您可以在宣告 UDTF 時指定 useArrow 參數來啟用它:
@udtf(returnType="c1: int, c2: int", useArrow=True)
可變參數清單 - *args 和 **kwargs
您可以使用 Python *args 或 **kwargs 語法,並實作邏輯來處理未指定的輸入值數目。
下列範例會傳回相同的結果,同時明確檢查自變數的輸入長度和類型:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
以下是相同的範例,但使用關鍵詞自變數:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
在註冊時定義靜態架構
UDTF 會傳回包含數據行名稱和型別排序序列之輸出架構的數據列。 如果所有查詢的 UDTF 架構一律保持不變,您可以在 @udtf 裝飾項目之後指定靜態、固定的架構。 它必須是 StructType:
StructType().add("c1", StringType())
或代表結構類型的 DDL 字串:
c1: string
在函數調用時間計算動態架構
UDDF 也可以根據輸入自變數的值,以程式設計方式計算每個呼叫的輸出架構。 若要這樣做,請定義一個稱為 analyze 的靜態方法,以接受與特定 UDTF 呼叫的自變數相對應的零個或多個參數。
analyze 方法的每個自變數都是 AnalyzeArgument 類別的實例,其中包含下列欄位:
AnalyzeArgument 類別欄位 |
描述 |
|---|---|
dataType |
輸入參數的類型作為 DataType。 對於輸入資料表的參數,這是 StructType 代表資料表欄位的。 |
value |
輸入參數的值作為 Optional[Any]。 對於非常數的表格參數或文字純量參數而言,這是 None。 |
isTable |
輸入參數是否為一個表格作為 BooleanType。 |
isConstantExpression |
輸入參數是否為可作為 BooleanType 的常數折疊表示式。 |
方法 analyze 會傳回一個 AnalyzeResult 類別的實例,其中包含結果表的架構作為 StructType,以及若干選擇性欄位。 如果 UDTF 接受輸入表格參數,則 AnalyzeResult 也可以包含一種要求的方法,以分割和排序輸入表格的行,如稍後所述。
AnalyzeResult 類別欄位 |
描述 |
|---|---|
schema |
結果資料表的架構為 StructType。 |
withSinglePartition |
是否要將所有輸入數據列傳送至與 BooleanType相同的 UDTF 類別實例。 |
partitionBy |
如果設定為非空白,分割表達式的每個唯一值組合的所有數據列都會由UDTF類別的個別實例取用。 |
orderBy |
如果設定為非空白,此設定會指定每個分區內資料列的排序。 |
select |
如果設定為非空,這是一個運算式序列,由UDTF指定,並由Catalyst對輸入 TABLE 引數中的數據列進行評估。 UDTF 會按照清單中名稱列出的順序,接收每個名稱的一項輸入屬性。 |
本 analyze 範例會針對輸入字串自變數中的每個單字傳回一個輸出數據行。
from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
MyUDTF(lit("hello world")).columns
['word_0', 'word_1']
將狀態轉送至未來的 eval 呼叫
analyze 方法可作為執行初始化的便利位置,然後將結果轉送至相同 UDTF 呼叫的未來 eval 方法調用。
若要這樣做,請建立 AnalyzeResult 的子類別,並從 analyze 方法傳回子類別的實例。
然後,將額外的自變數新增至 __init__ 方法,以接受該實例。
此 analyze 範例會傳回常數輸出架構,但在未來 __init__ 方法呼叫所取用的結果元數據中新增自定義資訊:
from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
產生輸出數據列
eval 方法會針對輸入數據表自變數的每個數據列執行一次(如果未提供任何數據表自變數,則只執行一次),後面接著最後一個叫用 terminate 方法。 方法會透過產生元組、清單或 pyspark.sql.Row 物件,輸出符合結果模式的零行或多行資料。
此範例會透過提供一個含有三個元素的元組來傳回資料列:
def eval(self, x, y, z):
yield (x, y, z)
您也可以省略括弧:
def eval(self, x, y, z):
yield x, y, z
新增尾端逗號以傳回僅有一欄的資料列:
def eval(self, x, y, z):
yield x,
您也可以產生 pyspark.sql.Row 物件。
def eval(self, x, y, z):
from pyspark.sql.types import Row
yield Row(x, y, z)
此範例會使用 Python 清單,從 terminate 方法產生輸出資料列。 您可以從 UDTF 評估中的先前步驟,將此狀態儲存在 類別內,以達到此目的。
def terminate(self):
yield [self.x, self.y, self.z]
將純量自變數傳遞至UDTF
您可以將純量參數作為常值表達式傳遞至 UDTF,此常值表達式由文字值或基於文字值的函式組成。 例如:
SELECT * FROM get_sum_diff(1, y => 2)
將數據表自變數傳遞至UDTF
除了純量輸入自變數之外,Python UDF 還可以接受輸入數據表做為自變數。 單一 UDTF 也可以接受數據表自變數和多個純量自變數。
然後,任何 SQL 查詢都可以使用 TABLE 關鍵詞提供輸入數據表,後面接著括在適當數據表標識符周圍的括弧,例如 TABLE(t)。 或者,您可以傳遞資料表子查詢,例如 TABLE(SELECT a, b, c FROM t) 或 TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))。
然後,輸入表格自變數會表示為 pyspark.sql.Row 方法的 eval 自變數,並針對輸入表格中的每一行呼叫 eval 方法。 您可以使用標準的 PySpark 欄位註解來處理每個資料列中的欄位。 下列範例示範明確匯入 PySpark Row 類型,然後篩選 id 欄位上傳遞的數據表:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
若要查詢函式,請使用 TABLE SQL 關鍵詞:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
從函數調用中的輸入行確定分區方法
使用數據表自變數呼叫 UDTF 時,任何 SQL 查詢都可以根據一或多個輸入資料表數據行的值,將輸入數據表分割成數個 UDTF 呼叫。
若要指定分割區,請在 PARTITION BY 自變數之後,在函數調用中使用 TABLE 子句。
這保證了具有分割列之值的每個唯一組合的所有輸入資料行,僅會由UDTF類別的一個實例取用。
請注意,除了簡單的欄位參考之外,PARTITION BY 子句也可以根據輸入表格的欄位接受任意表達式。 例如,您可以指定字串的 LENGTH、從日期擷取月份,或串連兩個值。
您也可以指定 WITH SINGLE PARTITION,而不是 PARTITION BY,以僅要求一個分割區,其中所有輸入數據行必須由 UDTF 類別的一個且僅有一個實例取用。
在每個分割區中,您可以選擇性地指定輸入數據列的必要順序,因為 UDTF 的 eval 方法會取用它們。 若要這樣做,請在上述 ORDER BY 或 PARTITION BY 子句後面提供 WITH SINGLE PARTITION 子句。
例如,請考慮以下的 UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
您可以在透過多種方式透過輸入表格呼叫 UDTF 時指定分割選項:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8);
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
從 analyze 方法指定輸入列的分區
請注意,針對上述每一種在 SQL 查詢中呼叫 UDTF 時分割輸入資料表的方法,UDTF 的analyze 方法可以自動指定相同的分割方式。
- 而不是將UDTF呼叫為
SELECT * FROM udtf(TABLE(t) PARTITION BY a),您可以更新analyze方法來設定欄位partitionBy=[PartitioningColumn("a")],並簡單地使用SELECT * FROM udtf(TABLE(t))來呼叫該函式。 - 同樣地,您無需在 SQL 查詢中指定
TABLE(t) WITH SINGLE PARTITION ORDER BY b,而是可以讓analyze設定欄位withSinglePartition=true和orderBy=[OrderingColumn("b")],然後只需傳遞TABLE(t)。 - 您可以讓
TABLE(SELECT a FROM t)設定analyze,然後只傳遞select=[SelectedColumn("a")],而不是在 SQL 查詢中傳遞TABLE(t)。
在下列範例中,analyze 傳回常數輸出架構、從輸入數據表中選取數據行的子集,並指定根據 date 數據行的值,將輸入數據表分割成數個 UDTF 呼叫:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add("longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word)",
alias="length_word")])