次の方法で共有


pandas ユーザー定義 関数

pandas のユーザー定義関数 (UDF) (ベクター化 UDF とも呼ばれます) は、Apache Arrow を使用してデータを転送し、pandas がデータを操作するためのユーザー定義関数です。 pandas UDF を使用すると、一度に 1 行の Python UDF と比べて、最大 100 倍までにパフォーマンスを向上させる可能性があるベクター化の操作が可能になります。

背景情報については、ブログ記事「Apache Spark 3.0 の今後のリリースにおける新しい Pandas UDF と Python の型ヒント」を参照してください。

pandas UDF をキーワード pandas_udf をデコレーターとして使用して定義し、関数を Python 型ヒントと一緒にラップします。 この記事では、さまざまな種類の pandas UDF について説明し、pandas UDF を型ヒントと一緒に使用する方法を示します。

シリーズからシリーズへの UDF

あるシリーズからシリーズ pandas UDF へのベクター化を使用して、スカラー操作を行うことができます。 これらは、selectwithColumn などの API と一緒に使用できます。

Python 関数は、pandas シリーズを入力として受け取り、同じ長さの pandas シリーズを返す必要があります。また、Python の型ヒントでこれらを指定する必要があります。 Spark は、列をバッチに分割し、各バッチの関数をデータのサブセットとして呼び出し、それらの結果を連結することで、pandas UDF を実行します。

次の例は、2 つの列の積を計算する pandas UDF を作成する方法を示しています。

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

UDF 系列の反復子への系列の反復子

反復子 UDF は、次の点を除いて、スカラー pandas UDF と同じです。

  • Python 関数
    • 入力として、1 つの入力バッチでなく、バッチの反復子を取得します。
    • 1 つの出力バッチでなく、出力バッチの反復子を返します。
  • 反復子内の出力全体の長さは、入力全体の長さと同じである必要があります。
  • ラップされた pandas UDF は、1 つの Spark 列を入力として受け取ります。

Python の型ヒントは Iterator[pandas.Series] ->Iterator[pandas.Series] として指定する必要があります。

この pandas UDF は、UDF の実行で何らかの状態を初期化する必要がある場合に便利です。たとえば、機械学習モデル ファイルを読み込んですべての入力バッチに推論を適用する場合などです。

次の例は、反復子をサポートする pandas UDF を作成する方法を示しています。

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

系列 UDF の反復子への複数の系列の反復子

系列 UDF の反復子への複数の系列の反復子は、系列 UDF を反復するための系列の反復子と同様の特性と制限を持ちます。 指定された関数は、バッチの反復子を受け取り、バッチの反復子を出力します。 また、UDF の実行で何らかの状態を初期化する必要がある場合にも便利です。

違いは次のとおりです。

  • 基になる Python 関数は、pandas シリーズのタプルの反復子を受け取ります。
  • ラップされた pandas UDF は、複数の Spark 列を入力として受け取ります。

型ヒントは Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series] として指定します。

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

シリーズからスカラー UDF へ

シリーズからのスカラー pandas UDF は、Spark 集計関数に似ています。 シリーズからスカラー pandas UDF は、1 つまたは複数の pandas シリーズからスカラー値までの集計を定義します。各 pandas シリーズは Spark 列を表します。 シリーズからスカラー pandas UDF を selectwithColumngroupBy.aggpyspark.sql.Window などの API と一緒に使用します。

型ヒントは pandas.Series, ... ->Any として表現します。 戻り値の型はプリミティブ データ型である必要があり、返されるスカラーは、intfloat などの Python プリミティブ型、または numpy.int64numpy.float64 などの NumPy データ型のいずれかにできます。 Any は、理想的には特定のスカラー型であるべきです。

この種類の UDF では、部分的な集計はサポートされず、各グループのすべてのデータがメモリに読み込まれます。

次の例では、この型の UDF を使用して、selectgroupBywindow の各操作で平均を計算する方法を示します。

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

使用法の詳細については、「 pyspark.sql.functions.pandas_udf」を参照してください。

使用法

Arrow バッチ サイズの設定

Note

この構成は、共有アクセス モードと Databricks Runtime 13.3 LTS から 14.2 で構成されたコンピューティングには影響しません。

Spark のデータ パーティションは、Arrow レコード バッチに変換されます。これにより、JVM のメモリ使用率が一時的に増大する可能性があります。 メモリ不足の例外を回避するには、spark.sql.execution.arrow.maxRecordsPerBatch 構成を各バッチの最大行数を決定する整数に設定して、Arrow レコード バッチのサイズを調整します。 既定値は、バッチあたり 1 万個のレコードです。 列数が多い場合は、それに応じて値を調整する必要があります。 この制限を使用すると、各データ パーティションは処理のために 1 つ以上のレコードバッチに分割されます。

タイム ゾーン セマンティクスがあるタイムスタンプ

Spark は内部的に UTC 値としてタイムスタンプを格納します。指定されたタイム ゾーンを使用せずに取り込まれたタイムスタンプ データは、現地時刻として UTC に変換されます (マイクロ秒単位)。

タイムスタンプ データを Spark でエクスポートまたは表示する場合、セッション タイム ゾーンを使用してタイムスタンプ値がローカライズされます。 セッションのタイム ゾーンは spark.sql.session.timeZone 構成を使用して設定され、既定では JVM システムのローカル タイム ゾーンに設定されます。 pandas は、列ごとにオプションのタイム ゾーンを使用して、datetime64 型とナノ秒の解像度 datetime64[ns] を使用します。

タイムスタンプ データを Spark から pandas に転送する場合は、ナノ秒に変換され、各列は Spark セッションのタイム ゾーンに変換され、その後そのタイム ゾーンにローカライズされます。これにより、タイム ゾーンが削除され、値が現地時間として表示されます。 このエラーは、toPandas() または timestamp 列を持つ pandas_udf を呼び出したときに発生します。

データが pandas から Spark に転送されると、UTC マイクロ秒に変換されます。 このエラーは、pandas DataFrame を使用して createDataFrame を呼び出したとき、または pandas UDF から timestamp を返したときに発生します。 これらの変換は自動的に行われ、Spark が予期される形式でデータを持つことができるようになります。そのため、これらの変換を自分で行う必要はありません。 ナノ秒の値は切り捨てられます。

標準 UDF は、タイムスタンプ データを Python の datetime オブジェクトとして読み込みます。これは、pandas タイムスタンプとは異なります。 最高のパフォーマンスを得るには、pandas UDF でタイムスタンプを操作するときは、pandas の時系列機能を使用することをお勧めします。 詳細については、時系列と日付の機能に関する記事を参照してください。

ノートブックの例

次のノートブックは、pandas UDF で実現できるパフォーマンスの向上を示しています。

Pandas UDF ベンチマーク ノートブック

ノートブックを入手