Python ユーザー定義テーブル関数とは
重要
この機能はパブリック プレビュー段階にあります。
ユーザー定義テーブル関数 (UDTF) を使用すると、スカラー値の代わりにテーブルを返す関数を登録できます。 UDTF の関数は、SQL クエリで参照される場合、共通テーブル式 (CTE) と同様に機能します。 SQL ステートメントのFROM
句で UDTF を参照し、結果に追加の Spark SQL 演算子を連結できます。
UDTF はローカルの SparkSession に登録され、ノートブックまたはジョブ レベルで分離されます。
UDTF は、割り当て済みまたは分離なしの共有アクセス モードで構成されたコンピューティングでサポートされます。 共有アクセス モードでは UDTF を使用できません。
UDTF を Unity Catalog のオブジェクトとして登録することはできず、UDTF を SQL ウェアハウスで使用することはできません。
UDTF の基本構文について
Apache Spark では、Python UDTF を Python クラスとして実装し、必須の eval
メソッドを使用します。
yield
を使用して、結果を行として出力します。
Apache Spark でクラスを UDTF として使用するには、PySpark の udtf
関数をインポートする必要があります。
Databricks では、この関数をデコレーターとして使用し、returnType
オプションを使用して、フィールド名と型を常に明示的に指定することをお勧めします。
次の例では、UDTF を使用してスカラー入力から単純なテーブルを作成します。
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
def eval(self, x: int, y: int):
yield x + y, x - y
SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# | 3| -1|
# +----+-----+
Python の *args
構文を使用し、不特定の数の入力値を処理するロジックを実装できます。 次の例では、引数の入力長と型を明示的に確認しながら、同じ結果が返されます。
@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# | 3| -1|
# +----+-----+
UDTF を登録する
次の構文を使用して、SQL クエリで使用するために UDTF を現在の SparkSession に登録できます。
spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)
次の例では、Python UDTF を SQL に登録します。
spark.udtf.register("simple_udtf", SimpleUDTF)
登録すると、次の例のように %sql
マジック コマンドまたは spark.sql()
関数を使用して、SQL で UDTF を使用できます。
%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")
結果の生成
Python UDTF は、結果を返すために yield
を使用して実装されます。 結果は常に、指定されたスキーマを持つ 0 行以上の行を含むテーブルとして返されます。
スカラー引数を渡すと、eval
メソッド内のロジックは、一連のスカラー引数が渡された状態で 1 回だけ実行されます。 テーブル引数の場合、eval
メソッドは入力テーブルの各行に対して 1 回実行されます。
ロジックは、入力ごとに 0、1、または多数の行を返すように記述できます。
次の UDTF は、項目をコンマ区切りリストから個別のエントリに分離して、入力ごとに 0 行以上を返す方法を示しています。
from pyspark.sql.functions import udtf
@udtf(returnType="id: int, item: string")
class Itemize:
def eval(self, id: int, item_list: str):
items = item_list.split(",")
for item in items:
if item != "":
yield id, item
UDTF にテーブル引数を渡す
SQL キーワード TABLE()
を使用して、UDTF にテーブル引数を渡すことができます。 次の例のように、テーブル名またはクエリを使用できます。
TABLE(table_name);
TABLE(SELECT * FROM table_name);
テーブル引数は、一度に 1 行ずつ処理されます。 標準の PySpark 列フィールドの注釈を使用して、各行の列を操作できます。 次の例では、PySpark の Row
型を明示的にインポートし、渡されたテーブルを id
フィールドでフィルター処理する方法を示します。
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# | 6|
# | 7|
# | 8|
# | 9|
# +---+
UDTF にスカラー引数を渡す
次の値の任意の組み合わせを使用して、UDTF にスカラー引数を渡すことができます。
- スカラー定数
- スカラー関数
- リレーション内のフィールド
リレーション内のフィールドを渡すには、UDTF を登録し、SQL の LATERAL
キーワードを使用する必要があります。
Note
インライン テーブルの別名を使用して、列のあいまいさを解消できます。
次の例では、LATERAL
を使用してテーブルから UDTF にフィールドを渡す方法を示します。
from pyspark.sql.functions import udtf
@udtf(returnType="id: int, item: string")
class Itemize:
def eval(self, id: int, item_list: str):
items = item_list.split(",")
for item in items:
if item != "":
yield id, item
spark.udtf.register("itemize", Itemize)
spark.sql("""
SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
(2, 'spoons,'),
(3, ''),
(4, 'knives,cups') t(id, item_list),
LATERAL itemize(id, item_list) b
""").show()
UDTF の既定値を設定する
必要に応じて、Python ロジックで参照できるクラス変数の既定値を設定する __init__
メソッドを実装できます。
この __init__
メソッドは引数を受け入れず、SparkSession の変数や状態情報にアクセスできません。
UDTF で Apache Arrow を使用する
Databricks では、入力として少量のデータを受信するが、大きなテーブルを出力する UDTF には Apache Arrow を使用することをお勧めしています。
次の例のように、UDTF を宣言するときに useArrow
パラメーターを指定することで、Arrow を有効にすることができます。
from pyspark.sql.functions import udtf
@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
def eval(self, x: int):
yield x, x + 1