分享方式:


Python 使用者定義資料表函式 (UDDF)

重要

這項功能在 Databricks Runtime 14.3 LTS 和更新版本中處於 公開預覽 狀態。

使用者定義的數據表函式 (UDTF) 可讓您註冊傳回數據表而非純量值的函式。 不同於從每個呼叫傳回單一結果值的純量函式,每個 UDTF 都會在 SQL 語句的 FROM 子句中叫用,並傳回整個數據表做為輸出。

每個 UDTF 呼叫都可以接受零個或多個自變數。 這些自變數可以是代表整個輸入數據表的純量表達式或數據表自變數。

基本 UDTF 語法

Apache Spark 會使用用來發出輸出數據列的強制 eval 方法 yield ,將 Python UDF 實作為 Python 類別。

若要使用類別作為 UDTF,您必須匯入 PySpark udtf 函式。 Databricks 建議使用這個函式作為裝飾專案,並使用 選項明確指定功能變數名稱和類型 returnType (除非類別定義 analyze 如後續章節中所述的方法)。

下列 UDTF 會使用兩個整數自變數的固定清單來建立資料表:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

註冊 UDTF

UDF 會註冊到本機 SparkSession ,並在筆記本或作業層級隔離。

您無法將 UDDF 註冊為 Unity 目錄中的物件,而 UDDF 不能與 SQL 倉儲搭配使用。

您可以將 UDTF 註冊至目前的 SparkSession ,以便搭配 函式 spark.udtf.register()在 SQL 查詢中使用。 提供 SQL 函式和 Python UDTF 類別的名稱。

spark.udtf.register("get_sum_diff", GetSumDiff)

呼叫已註冊的 UDTF

註冊之後,您可以使用 magic 命令或spark.sql()函式,在 SQL %sql 中使用 UDTF:

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

使用 Apache 箭頭

如果您的 UDTF 收到少量的數據做為輸入,但會輸出大型數據表,Databricks 建議使用 Apache Arrow。 您可以在宣告 UDTF 時指定 useArrow 參數來啟用它:

@udtf(returnType="c1: int, c2: int", useArrow=True)

變數自變數清單 - *args 和 **kwargs

您可以使用 Python *args**kwargs 語法並實作邏輯來處理未指定的輸入值數目。

下列範例會傳回相同的結果,同時明確檢查自變數的輸入長度和類型:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

以下是相同的範例,但使用關鍵詞自變數:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

在註冊時定義靜態架構

UDTF 會傳回包含數據行名稱和型別排序序列之輸出架構的數據列。 如果所有查詢的 UDTF 架構應該一律保持不變,您可以在裝飾項目之後 @udtf 指定靜態、固定的架構。 必須是 StructType

StructType().add("c1", StringType())

或代表結構類型的 DDL 字串:

c1: string

在函數調用時間計算動態架構

UDDF 也可以根據輸入自變數的值,以程式設計方式計算每個呼叫的輸出架構。 若要這樣做,請定義稱為 analyze 的靜態方法,以接受零個或多個參數,這些參數會對應至提供給特定 UDTF 呼叫的自變數。

方法的每個 analyze 自變數都是 類別的 AnalyzeArgument 實例,其中包含下列欄位:

AnalyzeArgument 類別欄位 描述
dataType 輸入自變數的類型為 DataType。 對於輸入資料表自變數,這是 StructType 代表資料表資料行的 。
value 輸入自變數的值做為 Optional[Any]。 這是 None 針對非常數的數據表自變數或常值純量自變數。
isTable 輸入自變數是否為數據表 BooleanType
isConstantExpression 輸入自變數是否為常數折疊表示式作為 BooleanType

方法 analyze 會傳回 類別的 AnalyzeResult 實例,其中包含結果數據表的架構作為 StructType 加一些選擇性字段。 如果 UDTF 接受輸入數據表自變數,則 AnalyzeResult 也可以包含分割和排序數個 UDTF 呼叫之輸入數據表數據列的要求方式,如下所述。

AnalyzeResult 類別欄位 描述
schema 結果資料表的架構為 StructType
withSinglePartition 是否要將所有輸入數據列傳送至與 BooleanType相同的 UDTF 類別實例。
partitionBy 如果設定為非空白,分割表達式的每個唯一值組合的所有數據列都會由UDTF類別的個別實例取用。
orderBy 如果設定為非空白,這會指定每個數據分割內數據列的順序。
select 如果設定為非空白,這是UDTF針對輸入TABLE自變數中資料行進行評估的UDTF所指定的表達式序列。 UDTF 會依列出的順序,接收清單中每個名稱的一個輸入屬性。

analyze 範例會針對輸入字串自變數中的每個單字傳回一個輸出數據行。

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

將狀態轉送至未來的 eval 呼叫

方法 analyze 可作為執行初始化的便利位置,然後將結果轉送至相同 UDTF 呼叫的未來 eval 方法調用。

若要這樣做,請建立的 AnalyzeResult 子類別,並從方法傳回子類別 analyze 的實例。 然後,將額外的自變數新增至 __init__ 方法,以接受該實例。

analyze 範例會傳回常數輸出架構,但會在結果元數據中新增自定義資訊,以便未來 __init__ 方法呼叫取用:

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

產生輸出數據列

方法 eval 會針對輸入數據表自變數的每個數據列執行一次(如果未提供任何數據表自變數,則只執行一次),後面接著最後一個叫用 terminate 方法。 方法會藉由產生元組、清單或對象,輸出符合結果架構的零個或多個 pyspark.sql.Row 數據列。

此範例會藉由提供三個元素的 Tuple 來傳回資料列:

def eval(self, x, y, z):
  yield (x, y, z)

您也可以省略括弧:

def eval(self, x, y, z):
  yield x, y, z

新增尾端逗號以傳回只有一個資料行的數據列:

def eval(self, x, y, z):
  yield x,

您也可以產生 pyspark.sql.Row 物件。

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

此範例會使用 Python 清單,從 terminate 方法產生輸出數據列。 您可以從 UDTF 評估中的先前步驟,將此狀態儲存在 類別內,以達到此目的。

def terminate(self):
  yield [self.x, self.y, self.z]

將純量自變數傳遞至UDTF

您可以將純量自變數傳遞至 UDTF,作為常值或函式所根據的常值或函式組成的常數運算式。 例如:

SELECT * FROM udtf(42, group => upper("finance_department"));

將數據表自變數傳遞至UDTF

除了純量輸入自變數之外,Python UDF 還可以接受輸入數據表做為自變數。 單一 UDTF 也可以接受數據表自變數和多個純量自變數。

然後,任何 SQL 查詢都可以使用 關鍵字來提供輸入資料表, TABLE 後面接著括弧括在適當的數據表標識符周圍,例如 TABLE(t)。 或者,您可以傳遞資料表子查詢,例如 TABLE(SELECT a, b, c FROM t)TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))

然後,輸入數據表自變數會表示為 pyspark.sql.Row 方法的自變數 eval ,並針對輸入數據表中的每個數據列呼叫 eval 方法。 您可以使用標準 PySpark 資料行欄位批注來與每個資料列中的數據行互動。 下列範例示範明確匯入 PySpark Row 類型,然後篩選欄位上傳遞的 id 資料表:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

若要查詢函式,請使用 TABLE SQL 關鍵詞:

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

從函數調用指定輸入數據列的數據分割

使用數據表自變數呼叫 UDTF 時,任何 SQL 查詢都可以根據一或多個輸入資料表數據行的值,將輸入數據表分割成數個 UDTF 呼叫。

若要指定分割區,請在 自變數之後TABLE,使用PARTITION BY函數調用中的 子句。 這可確保每個數據分割數據行值唯一組合的所有輸入數據列,都只會由UDTF類別的一個實例取用。

請注意,除了簡單的數據行參考之外, PARTITION BY 子句也會根據輸入數據表數據行接受任意表達式。 例如,您可以指定 LENGTH 字串的 、從日期擷取月份,或串連兩個值。

您也可以指定 WITH SINGLE PARTITION ,而不是 PARTITION BY 只要求一個數據分割,其中所有輸入數據列都必須由 UDTF 類別的一個實例取用。

在每個分割區中,您可以選擇性地指定輸入數據列的必要順序,因為 UDTF 的 eval 方法會取用它們。 若要這樣做,請在ORDER BY上述 或 WITH SINGLE PARTITION 子句之後PARTITION BY提供 子句。

例如,請考慮下列 UDTF:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

在輸入資料表上呼叫 UDTF 時,您可以指定資料分割選項:

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

從方法指定輸入數據列 analyze 的數據分割

請注意,針對上述在 SQL 查詢中呼叫 UDF 時分割輸入數據表的方式,UDTF analyze 的方法會自動指定相同的數據分割方法。

  • 您可以更新 analyze 方法來設定欄位partitionBy=[PartitioningColumn("a")],而不需要呼叫 UDTF 作為 SELECT * FROM udtf(TABLE(t) PARTITION BY a),而只需使用 SELECT * FROM udtf(TABLE(t))來呼叫 函式即可。
  • 透過相同的令牌,您可以設定字段,orderBy=[OrderingColumn("b")]然後直接傳遞 TABLE(t),而不是在 SQL 查詢中指定 TABLE(t) WITH SINGLE PARTITION ORDER BY bwithSinglePartition=true analyze
  • 您可以設定 select=[SelectedColumn("a")] analyze ,然後直接傳遞 TABLE(t),而不是傳入 TABLE(SELECT a FROM t) SQL 查詢。

在下列範例中, analyze 傳回常數輸出架構、從輸入數據表選取數據行的子集,並指定輸入數據表會根據數據行的值 date ,分割成數個 UDTF 呼叫:

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])