分享方式:


Pandas 使用者定義函式

Pandas 的使用者定義函數 (UDF) - 也稱為向量化 UDF - 是一個使用者定義函數,使用 Apache Arrow 來傳輸資料,並使用 pandas 來處理資料。 Pandas UDF 允許向量化的作業,相較於逐行的 Python UDF,其效能可提升 100 倍。

如需背景資訊,請參閱部落格文章 New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0

您可以使用關鍵字pandas_udf作為裝飾項目來定義 Pandas UDF,並使用 Python 類型提示來包裝函數。 本文描述不同類型的 Pandas UDF,並顯示如何使用類型提示來使用 Pandas UDF。

數列對數列 UDF

您使用數列對數列 Pandas UDF 來向量化純量作業。 您可以將它們與 selectwithColumn 等 API 搭配使用。

Python 函式應該接受一個 Pandas 數列作為輸入,並傳回一個相同長度的 Pandas 數列,您應該在 Python 類型提示中指定這些數列。 Spark 執行 Pandas UDF 的方式是將資料欄分割成批次,以資料的子集來呼叫每個批次的函式,然後串連結果。

以下範例顯示如何建立一個 Pandas UDF,用來計算 2 資料欄的乘積。

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

數列迭代器對數列迭代器 UDF

迭代器 UDF 與純量 Pandas UDF 相同,除了:

  • Python 函式
    • 以批次的迭代器取代單一輸入批次作為輸入。
    • 傳回輸出批次的迭代器,而非單一輸出批次。
  • 迭代器中整個輸出的長度應該與整個輸入的長度相同。
  • 裝合後的 Pandas UDF 將單一 Spark 資料欄作為輸入。

您應將 Python 類型提示指定為 Iterator[pandas.Series] - >Iterator[pandas.Series]

當 UDF 執行需要初始化某些狀態時,這個 Pandas UDF 就很有用,例如,載入一個機器學習模型檔案,以套用推論到每個輸入批次。

下列範例顯示如何使用迭代器支援建立 Pandas UDF。

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

多重數列迭代器對數列迭代器 UDF

多重數列迭代器對數列迭代器 UDF 具有與數列迭代器對數列迭代器 UDF 類似的特性和限制。 指定函式接收批次的迭代器,並輸出批次的迭代器。 當 UDF 執行需要初始化某些狀態時,它也很有用。

差異包括:

  • 基礎的 Python 函式取得 Pandas 數列元組中的迭代器。
  • 裝合後的 Pandas UDF 將多重 Spark 資料欄作為輸入。

您會將類型提示指定為 Iterator[Tuple[pandas.Series, ...]] - >Iterator[pandas.Series]

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

數列對純量 UDF

數列對純量的 Pandas UDF 類似於 Spark 的彙總函式。 數列到純量的 Pandas UDF 定義了從一個或多個 Pandas 數列到純量的彙總,其中每個 Pandas 數列代表一個 Spark 資料欄。 您使用 selectwithColumngroupBy.aggpyspark.sql.Window 等 API 將數列對純量的 Pandas UDF。

您會將類型提示表示為 pandas.Series, ... - >Any。 傳回型別應是原始資料類型,傳回的純量可以是 Python 基本類型,例如 intfloat,或是 NumPy 資料類型,例如 numpy.int64numpy.float64Any 最好是特定的純量類型。

這種類型的 UDF 支援部分彙總,每個群組的所有資料都會載入記憶體。

下列範例顯示如何使用這種類型的 UDF 來計算具有 selectgroupBywindow 作業的平均值:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

如需詳細的使用方式,請參閱 pyspark.sql.functions.pandas_udf

使用方式

設定 Arrow 批次大小

注意

此設定對以共用存取模式設定的計算和 Databricks Runtime 13.3 LTS 至 14.2 沒有影響。

Spark 中的資料分割區會轉換成 Arrow 記錄批次,這會暫時導致 JVM 的記憶體使用量偏高。 為避免可能發生的記憶體不足例外情況,您可以將 spark.sql.execution.arrow.maxRecordsPerBatch 設定為整數,以調整 Arrow 記錄批次的大小,該整數決定每個批次的最大列數。 每個批次的預設值為 10,000 筆記錄。 如果資料欄數目很大,則應據以調整該值。 使用此限制,每個資料分割區會分成 1 個或多個記錄批次處理。

具有時區語意的時間戳記

Spark 內部會以 UTC 值儲存時間戳記,而沒有指定時區的時間戳記資料會以微秒解析度將當地時間轉換為 UTC。

在 Spark 中匯出或顯示時間戳記資料時,會將工作階段時區用來將時間戳記值本地化。 工作階段時區與 spark.sql.session.timeZone 設定一起設定,預設為 JVM 系統地區設定時區。 Pandas 使用具有奈秒解析度的 datetime64 類型 datetime64[ns],每資料欄可選擇時區。

當時間戳記資料從 Spark 傳輸到 Pandas 時,會轉換成奈秒,而且每一資料欄都會轉換成 Spark 工作階段時區,然後本地化為該時區,這樣就會移除時區,並以當地時間顯示數值。 使用時間戳記資料欄呼叫 toPandas()pandas_udf 時,就會發生此情況。

時間戳記資料從 Pandas 傳輸至 Spark 時,會轉換成 UTC 微秒。 當使用 Pandas DataFrame 呼叫 createDataFrame 或從 Pandas UDF 傳回時間戳記時,會發生這種情況。 這些轉換都是自動完成的,以確保 Spark 有預期格式的資料,所以不需要自己實施這些轉換。 任何奈秒值都會截斷。

標準的 UDF 會將時間戳記的資料載入為 Python 日期時間物件,這與 Pandas 的時間戳記不同。 為獲得最佳效能,我們建議您在 Pandas UDF 中處理時間戳記的時候,使用 Pandas 時間數列功能。 如需詳細資料,請參閱時間序列/日期功能

範例筆記本

下列的筆記本說明使用 Pandas UDF 可以達到的效能改善:

Pandas UDF 基準筆記本

取得筆記本