Share via


什麼是 Python 使用者定義資料表函式?

重要

這項功能處於公開預覽狀態

使用者定義的資料表函式 (UDTF) 可讓您註冊傳回資料表而非純量值的函式。 在 SQL 查詢中參考時,UDDF 函式類似于通用資料表運算式 (CTE)。 您可以在 SQL 語句的 子句中 FROM 參考 UDF,而且您可以將其他 Spark SQL 運算子鏈結至結果。

UDDF 會註冊到本機 SparkSession,並在筆記本或作業層級隔離。

在以指派或無隔離共用存取模式設定的計算上,支援 UDF。 您無法在共用存取模式上使用 UDF。

您無法將 UDDF 註冊為 Unity 目錄中的物件,而 UDDF 不能與 SQL 倉儲搭配使用。

UDTF 的基本語法為何?

Apache Spark 會使用強制 eval 方法,以 Python 類別的形式實作 Python UDF。

您可以使用 發出結果作為資料列 yield

若要讓 Apache Spark 使用類別作為 UDTF,您必須匯入 PySpark udtf 函式。

Databricks 建議使用此函式作為裝飾專案,並且一律使用 returnType 選項明確指定功能變數名稱和類型。

下列範例會使用 UDTF 從純量輸入建立簡單的資料表:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, x: int, y: int):
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

您可以使用 Python *args 語法並實作邏輯來處理未指定的輸入值數目。 下列範例會傳回相同的結果,同時明確檢查引數的輸入長度和類型:

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

註冊 UDTF

您可以使用下列語法向目前的 SparkSession 註冊 UDTF,以用於 SQL 查詢:

spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)

下列範例會將 Python UDTF 註冊至 SQL:

spark.udtf.register("simple_udtf", SimpleUDTF)

註冊之後,您可以使用 magic 命令或 spark.sql() 函式在 SQL %sql 中使用 UDTF,如下列範例所示:

%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")

產生結果

使用 實作 yield Python UDF 以傳回結果。 結果一律會以資料表的形式傳回,其中包含具有指定架構的 0 或多個資料列。

傳遞純量引數時,方法中的 eval 邏輯只會執行一次,並傳遞一組純量引數。 針對資料表引數,方法 eval 會針對輸入資料表中的每個資料列執行一次。

邏輯可以寫入以傳回每個輸入的 0、1 或多個資料列。

下列 UDTF 示範將專案與逗號分隔清單分隔為個別專案,為每個輸入傳回 0 或多個資料列:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

將資料表引數傳遞至 UDTF

您可以使用 SQL 關鍵字 TABLE() 將資料表引數傳遞至 UDTF。 您可以使用資料表名稱或查詢,如下列範例所示:

TABLE(table_name);
TABLE(SELECT * FROM table_name);

資料表引數會一次處理一個資料列。 您可以使用標準 PySpark 資料列欄位批註來與每個資料列中的資料行互動。 下列範例示範明確匯入 PySpark Row 類型,然後篩選欄位上傳遞的 id 資料表:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# |  6|
# |  7|
# |  8|
# |  9|
# +---+

將純量引數傳遞至 UDTF

您可以使用下列值的任何組合,將純量引數傳遞至 UDTF:

  • 純量常數
  • 純量函式
  • 關聯中的欄位

若要傳遞關聯中的欄位,您必須註冊 UDTF 並使用 SQL LATERAL 關鍵字。

注意

您可以使用內嵌資料表別名來厘清資料行。

下列範例示範如何使用 LATERAL 將資料表中的欄位傳遞至 UDTF:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

spark.udtf.register("itemize", Itemize)

spark.sql("""
    SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
    (2, 'spoons,'),
    (3, ''),
    (4, 'knives,cups') t(id, item_list),
    LATERAL itemize(id, item_list) b
""").show()

設定 UDDF 的預設值

您可以選擇性地實 __init__ 作方法來設定可在 Python 邏輯中參考的類別變數預設值。

方法 __init__ 不接受任何引數,而且無法存取 SparkSession 中的變數或狀態資訊。

搭配 UDDF 使用 Apache 箭頭

Databricks 建議針對 UDF 使用 Apache Arrow,以接收少量資料做為輸入,但輸出大型資料表。

您可以在宣告 UDTF 時指定 useArrow 參數來啟用 Arrow,如下列範例所示:

from pyspark.sql.functions import udtf

@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1