Share via


Python ユーザー定義テーブル関数とは

重要

この機能はパブリック プレビュー段階にあります。

ユーザー定義テーブル関数 (UDTF) を使用すると、スカラー値の代わりにテーブルを返す関数を登録できます。 UDTF の関数は、SQL クエリで参照される場合、共通テーブル式 (CTE) と同様に機能します。 SQL ステートメントのFROM 句で UDTF を参照し、結果に追加の Spark SQL 演算子を連結できます。

UDTF はローカルの SparkSession に登録され、ノートブックまたはジョブ レベルで分離されます。

UDTF は、割り当て済みまたは分離なしの共有アクセス モードで構成されたコンピューティングでサポートされます。 共有アクセス モードでは UDTF を使用できません。

UDTF を Unity Catalog のオブジェクトとして登録することはできず、UDTF を SQL ウェアハウスで使用することはできません。

UDTF の基本構文について

Apache Spark では、Python UDTF を Python クラスとして実装し、必須の eval メソッドを使用します。

yield を使用して、結果を行として出力します。

Apache Spark でクラスを UDTF として使用するには、PySpark の udtf 関数をインポートする必要があります。

Databricks では、この関数をデコレーターとして使用し、returnType オプションを使用して、フィールド名と型を常に明示的に指定することをお勧めします。

次の例では、UDTF を使用してスカラー入力から単純なテーブルを作成します。

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, x: int, y: int):
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

Python の *args 構文を使用し、不特定の数の入力値を処理するロジックを実装できます。 次の例では、引数の入力長と型を明示的に確認しながら、同じ結果が返されます。

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

UDTF を登録する

次の構文を使用して、SQL クエリで使用するために UDTF を現在の SparkSession に登録できます。

spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)

次の例では、Python UDTF を SQL に登録します。

spark.udtf.register("simple_udtf", SimpleUDTF)

登録すると、次の例のように %sql マジック コマンドまたは spark.sql() 関数を使用して、SQL で UDTF を使用できます。

%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")

結果の生成

Python UDTF は、結果を返すために yield を使用して実装されます。 結果は常に、指定されたスキーマを持つ 0 行以上の行を含むテーブルとして返されます。

スカラー引数を渡すと、eval メソッド内のロジックは、一連のスカラー引数が渡された状態で 1 回だけ実行されます。 テーブル引数の場合、eval メソッドは入力テーブルの各行に対して 1 回実行されます。

ロジックは、入力ごとに 0、1、または多数の行を返すように記述できます。

次の UDTF は、項目をコンマ区切りリストから個別のエントリに分離して、入力ごとに 0 行以上を返す方法を示しています。

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

UDTF にテーブル引数を渡す

SQL キーワード TABLE() を使用して、UDTF にテーブル引数を渡すことができます。 次の例のように、テーブル名またはクエリを使用できます。

TABLE(table_name);
TABLE(SELECT * FROM table_name);

テーブル引数は、一度に 1 行ずつ処理されます。 標準の PySpark 列フィールドの注釈を使用して、各行の列を操作できます。 次の例では、PySpark の Row 型を明示的にインポートし、渡されたテーブルを id フィールドでフィルター処理する方法を示します。

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# |  6|
# |  7|
# |  8|
# |  9|
# +---+

UDTF にスカラー引数を渡す

次の値の任意の組み合わせを使用して、UDTF にスカラー引数を渡すことができます。

  • スカラー定数
  • スカラー関数
  • リレーション内のフィールド

リレーション内のフィールドを渡すには、UDTF を登録し、SQL の LATERAL キーワードを使用する必要があります。

Note

インライン テーブルの別名を使用して、列のあいまいさを解消できます。

次の例では、LATERAL を使用してテーブルから UDTF にフィールドを渡す方法を示します。

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

spark.udtf.register("itemize", Itemize)

spark.sql("""
    SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
    (2, 'spoons,'),
    (3, ''),
    (4, 'knives,cups') t(id, item_list),
    LATERAL itemize(id, item_list) b
""").show()

UDTF の既定値を設定する

必要に応じて、Python ロジックで参照できるクラス変数の既定値を設定する __init__ メソッドを実装できます。

この __init__ メソッドは引数を受け入れず、SparkSession の変数や状態情報にアクセスできません。

UDTF で Apache Arrow を使用する

Databricks では、入力として少量のデータを受信するが、大きなテーブルを出力する UDTF には Apache Arrow を使用することをお勧めしています。

次の例のように、UDTF を宣言するときに useArrow パラメーターを指定することで、Arrow を有効にすることができます。

from pyspark.sql.functions import udtf

@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1