Benutzerdefinierte Pandas-Funktionen

Eine benutzerdefinierte Pandas-Funktion (User-Defined Function, UDF), die auch als Vektor-UDF bezeichnet wird, ist eine benutzerdefinierte Funktion, die Apache Arrow zum Übertragen von Daten und Pandas zum Arbeiten mit den Daten verwendet. Pandas-UDFs ermöglichen Vektoroperationen, die die Leistung im Vergleich zu zeilenweisen Python-UDFs bis um das 100-fache steigern können.

Weitere Informationen finden Sie im Blogbeitrag Neue Pandas-UDFs und Python-Typhinweise in der nächsten Version von Apache Spark 3.0.

Sie definieren eine Pandas-UDF mit dem Schlüsselwort pandas_udf als Decorator und umschließen die Funktion mit einem Python-Typhinweis. In diesem Artikel werden die verschiedenen Typen von Pandas-UDFs beschrieben, und es wird veranschaulicht, wie Pandas-UDFs mit Typhinweisen verwendet werden.

UDF vom Typ „Serie zu Serie“

Sie verwenden eine Pandas-UDF vom Typ „Serie zu Serie“, um skalare Operationen zu vektorisieren. Sie können sie mit APIs wie select und withColumn verwenden.

Die Python-Funktion sollte eine Pandas-Serie als Eingabe verwenden und eine Pandas-Serie der gleichen Länge zurückgeben. Sie sollten diese in den Python-Typhinweisen angeben. Spark führt eine Pandas-UDF aus, indem Spalten in Batches aufgeteilt werden, die Funktion für jeden Batch als Teilmenge der Daten aufgerufen wird und dann die Ergebnisse verkettet werden.

Im folgenden Beispiel wird veranschaulicht, wie Sie eine Pandas-UDF erstellen, die das Produkt aus zwei Spalten berechnet.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

UDF vom Typ „Iterator der Serie zu Iterator der Serien“

Eine Iterator-UDF ist abgesehen von den folgenden Punkten mit einer skalaren Pandas-UDF vergleichbar:

  • Die Python-Funktion:
    • Verwendet einen Iterator von Batches anstelle eines einzelnen Eingabebatches als Eingabe.
    • Gibt einen Iterator von Ausgabebatches anstelle eines einzelnen Ausgabebatches zurück.
  • Die Länge der gesamten Ausgabe im Iterator sollte der Länge der gesamten Eingabe entsprechen.
  • Die umschlossene Pandas-UDF verwendet eine einzelne Spark-Spalte als Eingabe.

Sie sollten den Python-Typhinweis als Iterator[pandas.Series] ->Iterator[pandas.Series] angeben.

Diese Pandas-UDF ist nützlich, wenn die Ausführung der benutzerdefinierten Funktion eine Initialisierung eines Zustands erfordert, z. B. das Laden einer Machine Learning-Modelldatei, um Rückschlüsse auf jeden Eingabebatch anzuwenden.

Im folgenden Beispiel wird veranschaulicht, wie Sie eine Pandas-UDF mit Iteratorunterstützung erstellen.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

UDF vom Typ „Iterator[Tuple[pd.Series, pd.Series]] -> Iterator[pd.Series]“

Eine UDF vom Typ „Iterator[Tuple[pd.Series, pd.Series]] -> Iterator[pd.Series]“ weist ähnliche Merkmale und Einschränkungen auf wie eine UDF vom Typ „Iterator[pd.Series] -> Iterator[pd.Series]“. Die angegebene Funktion verwendet einen Iterator von Batches und gibt einen Iterator von Batches aus. Diese Funktion ist auch nützlich, wenn die Ausführung der benutzerdefinierten Funktion eine Initialisierung eines Zustands erfordert.

Folgende Unterschiede bestehen:

  • Die zugrunde liegende Python-Funktion verwendet einen Iterator eines Tupels der Pandas-Serie.
  • Die umschlossene Pandas-UDF verwendet mehrere Spark-Spalten als Eingabe.

Sie geben die Typhinweise als Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series] an.

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

UDF vom Typ „Serie zu Skalar“

Pandas-UDFs vom Typ „Serie zu Skalar“ sind mit Spark-Aggregatfunktionen vergleichbar. Eine Pandas-UDF vom Typ „Serie zu Skalar“ definiert eine Aggregation von einer oder mehreren Pandas-Serien zu einem Skalarwert, wobei jede Pandas-Serie eine Spark-Spalte darstellt. Sie verwenden eine Pandas-UDF vom Typ „Serie zu Skalar“mit APIs wie select, withColumn, groupBy.agg und pyspark.sql.Window.

Sie drücken den Typhinweis als pandas.Series, ... ->Any aus. Der Rückgabetyp sollte ein primitiver Datentyp sein, und der zurückgegebene Skalar kann entweder ein primitiver Python-Typ wie int oder float oder ein NumPy-Datentyp wie numpy.int64 oder numpy.float64 sein. Any sollte idealerweise ein bestimmter Skalartyp sein.

Partielle Aggregationen werden von diesem UDF-Typ nicht unterstützt, und alle Daten für jede Gruppe werden in den Arbeitsspeicher geladen.

Im folgenden Beispiel wird veranschaulicht, wie dieser UDF-Typ zum Berechnen des Mittelwerts mit select-, groupBy- und window-Operationen verwendet wird:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

Ausführliche Informationen zur Verwendung finden Sie unter pyspark.sql.functions.pandas_udf.

Verwendung

Festlegen der Arrow-Batchgröße

Hinweis

Diese Konfiguration hat keine Auswirkungen auf Computeressourcen, die mit dem Zugriffsmodus „Freigegeben“ und Databricks Runtime 13.3 LTS bis 14.2 konfiguriert wurden.

Datenpartitionen in Spark werden in Arrow-Datensatzbatches konvertiert, was vorübergehend zu einer hohen Speicherauslastung in der JVM führen kann. Um mögliche Ausnahmen aufgrund von nicht genügend Arbeitsspeicher zu vermeiden, können Sie die Größe der Arrow-Datensatzbatches anpassen, indem Sie die spark.sql.execution.arrow.maxRecordsPerBatch-Konfiguration auf eine ganze Zahl festlegen, die die maximale Anzahl von Zeilen für jeden Batch bestimmt. Der Standardwert ist 10.000 Datensätze pro Batch. Bei einer großen Anzahl von Spalten sollte der Wert entsprechend angepasst werden. Mit diesem Grenzwert wird jede Datenpartition zur Verarbeitung in einen oder mehrere Datensatzbatches unterteilt.

Semantik für Zeitstempel mit Zeitzone

Spark speichert Zeitstempel intern als UTC-Werte, und ohne angegebene Zeitzone übertragene Zeitstempeldaten werden als Ortszeit in UTC mit Mikrosekundenauflösung konvertiert.

Wenn Zeitstempeldaten in Spark exportiert oder angezeigt werden, wird die Sitzungszeitzone verwendet, um die Zeitstempelwerte zu lokalisieren. Die Sitzungszeitzone wird mit der spark.sql.session.timeZone-Konfiguration festgelegt und ist standardmäßig auf die lokale Zeitzone des JVM-Systems festgelegt. Pandas verwendet einen datetime64-Typ mit Nanosekundenauflösung (datetime64[ns]) mit optionaler Zeitzone pro Spalte.

Beim Übertragen von Zeitstempeldaten von Spark in Pandas werden diese in Nanosekunden konvertiert, und jede Spalte wird in die Spark-Sitzungszeitzone konvertiert und dann in diese Zeitzone lokalisiert, wodurch die Zeitzone entfernt wird und die Werte als Ortszeit angezeigt werden. Dies geschieht beim Aufrufen von toPandas() oder pandas_udf mit Zeitstempelspalten.

Beim Übertragen von Zeitstempeldaten von Pandas in Spark werden diese in UTC-Mikrosekunden konvertiert. Dies geschieht beim Aufrufen von createDataFrame mit einem Pandas-Datenrahmen oder bei der Rückgabe eines Zeitstempels von einer Pandas-UDF. Diese Konvertierungen erfolgen automatisch, um sicherzustellen, dass Spark über Daten im erwarteten Format verfügt, sodass Sie keine dieser Konvertierungen selbst durchführen müssen. Alle Nanosekundenwerte werden abgeschnitten.

Eine Standard-UDF lädt Zeitstempeldaten als Python-datetime-Objekte, die sich von einem Pandas-Zeitstempel unterscheiden. Um die beste Leistung zu erzielen, wird empfohlen, die Pandas-Zeitreihenfunktion zu verwenden, wenn Sie mit Zeitstempeln in einer Pandas-UDF arbeiten. Weitere Informationen finden Sie unter Zeitreihen-/Datumsfunktion.

Notebook mit Beispielen

Das folgende Notebook veranschaulicht die Leistungsverbesserungen, die Sie mit Pandas-UDFs erzielen können:

Notebook „Pandas UDFs Benchmark“

Notebook abrufen