Funkcje biblioteki Pandas zdefiniowane przez użytkownika

Funkcja zdefiniowana przez użytkownika (UDF) pandas, znana również jako wektoryzowana UDF, to funkcja zdefiniowana przez użytkownika, która używa narzędzia Apache Arrow do przesyłania danych oraz biblioteki pandas do pracy z danymi. Funkcje zdefiniowane przez użytkownika biblioteki pandas umożliwiają wektoryzowane operacje, które mogą zwiększyć wydajność nawet 100 razy w porównaniu z funkcjami zdefiniowanymi przez użytkownika języka Python wykonywanymi na poziomie wiersza.

Aby uzyskać podstawowe informacje, zobacz wpis na blogu New Pandas UDFs i Python Type Hints w nadchodzącej wersji Apache Spark 3.0.

Można zdefiniować pandas UDF przy użyciu słowa kluczowego jako dekoratora i opatrzyć funkcję typem języka Python . W tym artykule opisano różne typy funkcji zdefiniowanych przez użytkownika biblioteki pandas i pokazano, jak używać funkcji zdefiniowanych przez użytkownika biblioteki pandas z wskazówkami dotyczącymi typów.

Z serii na serię UDF

Używasz Pandas UDF typu Series do Series do wektorowania operacji skalarnych. Można ich używać z interfejsami API, takimi jak select i withColumn.

Funkcja języka Python musi zaakceptować serię pandas jako dane wejściowe i zwrócić serię pandas o tej samej długości. Określ te typy przy użyciu wskazówek dotyczących typów języka Python. Platforma Spark uruchamia funkcję UDF biblioteki pandas, dzieląc dane na partie wierszy, wywołując funkcję dla każdej partii, a następnie łącząc wyniki.

W poniższym przykładzie pokazano, jak utworzyć UDF (funkcję zdefiniowaną przez użytkownika) w bibliotece pandas, która oblicza iloczyn dwóch kolumn.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

Iterator serii do iteratora funkcji zdefiniowanej przez użytkownika

Funkcja UDF iteratora jest taka sama jak skalarna funkcja UDF w pandas, z wyjątkiem:

  • Funkcja języka Python
    • Przyjmuje iterator partii zamiast pojedynczej partii wejściowej jako dane wejściowe.
    • Zwraca iterator paczek wyjściowych zamiast pojedynczej paczki wyjściowej.
  • Długość całych danych wyjściowych w iteratorze powinna być taka sama jak długość całego wejścia.
  • Opakowana funkcja UDF biblioteki pandas przyjmuje pojedynczą kolumnę platformy Spark jako dane wejściowe.

Należy określić wskazówkę typu języka Python jako Iterator[pandas.Series] ->Iterator[pandas.Series].

Funkcja UDF pandas jest przydatna, gdy jej wykonanie wymaga zainicjowania określonego stanu, na przykład załadowania pliku modelu uczenia maszynowego w celu zastosowania wnioskowania na każdej partii danych wejściowych.

W poniższym przykładzie pokazano, jak utworzyć pandas UDF z obsługą iteratora.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

Iterator wielu serii na iterator serii w funkcji UDF

Iterator wielu serii do iteratora UDF serii ma podobne cechy i ograniczenia jak iterator serii do iteratora UDF serii. Określona funkcja przyjmuje iterator partii i generuje iterator partii. Jest to również przydatne, gdy wykonanie funkcji zdefiniowanej przez użytkownika wymaga zainicjowania określonego stanu.

Różnice są następujące:

  • Podstawowa funkcja w języku Python przyjmuje iterator krotki z serii pandas.
  • Opakowana funkcja pandas UDF przyjmuje wiele kolumn frameworka Spark jako dane wejściowe.

Należy określić wskazówki dotyczące typu jako Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

Seria do skalarnej UDF

Serie do skalarnych funkcji zdefiniowanych przez użytkownika biblioteki pandas są podobne do funkcji agregujących platformy Spark. Funkcja UDF z serii do skalara pandas definiuje agregację z co najmniej jednej serii pandas do wartości skalarnej, gdzie każda seria pandas reprezentuje kolumnę Spark. Używasz serii do skalowania biblioteki pandas UDF z interfejsami API, takimi jak select, withColumn, groupBy.aggi pyspark.sql.Window.

Wyrażasz wskazówkę typu jako pandas.Series, ... ->Any. Zwracany typ powinien być typem danych pierwotnych, a zwracany skalar może być typem pierwotnym języka Python, na przykład lub typem danych NumPy, int takim jak float lub numpy.int64numpy.float64. Any najlepiej być określonym typem skalarnym.

Ten rodzaj funkcji zdefiniowanej przez użytkownika nie obsługuje częściowej agregacji, a wszystkie dane dla każdej grupy są ładowane do pamięci.

W poniższym przykładzie pokazano, jak używać tego typu funkcji zdefiniowanej przez użytkownika do obliczania średniej z operacjami select, groupBy i window.

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

Aby uzyskać szczegółowe informacje o użyciu, zobacz pyspark.sql.functions.pandas_udf.

Użycie

Ustawianie rozmiaru partii Arrow

Uwaga

Ta konfiguracja nie ma wpływu na obliczenia skonfigurowane przy użyciu standardowego trybu dostępu i środowiska Databricks Runtime 13.3 LTS do wersji 14.2.

Partycje danych w Spark są konwertowane na partie rekordów Arrow, co może tymczasowo prowadzić do wysokiego użycia pamięci w JVM. Aby uniknąć możliwych wyjątków braku pamięci, można dostosować rozmiar partii rekordów strzałki, ustawiając spark.sql.execution.arrow.maxRecordsPerBatch konfigurację na liczbę całkowitą, która określa maksymalną liczbę wierszy dla każdej partii. Domyślna ilość to 10 000 danych na partię. Jeśli liczba kolumn jest duża, należy odpowiednio dostosować wartość. Korzystając z tego limitu, każda partycja danych jest podzielona na co najmniej 1 partie rekordów do przetwarzania.

Sygnatura czasowa z semantyką strefy czasowej

Platforma Spark wewnętrznie przechowuje znaczniki czasu w formie wartości UTC, a dane znacznika czasu wprowadzone bez określonej strefy czasowej są traktowane jako czas lokalny i konwertowane na UTC z mikrosekundową rozdzielczością.

Gdy dane sygnatury czasowej są eksportowane lub wyświetlane na platformie Spark, strefa czasowa sesji służy do lokalizowania wartości znacznika czasu. Strefa czasowa sesji jest ustawiana z konfiguracją spark.sql.session.timeZone i domyślnie ustawiona na lokalną strefę czasową systemu JVM. Biblioteka pandas używa datetime64 typu z rozdzielczością nanosekundową, datetime64[ns], z opcjonalną strefą czasową w zależności od kolumny.

Gdy dane sygnatur czasowych są przesyłane ze Spark do pandas, są najpierw konwertowane na nanosekundy, a następnie każda kolumna jest przekształcana do strefy czasowej sesji Spark, po czym wartości są lokalizowane w tej strefie czasowej. Proces ten usuwa informacje o strefie czasowej i wyświetla wartości jako czas lokalny. Dzieje się tak podczas wywoływania toPandas() lub pandas_udf z kolumnami znaczników czasu.

Gdy dane znaczników czasowych są przesyłane z biblioteki pandas do platformy Spark, są konwertowane na mikrosekundy UTC. Dzieje się tak w przypadku wywoływania createDataFrame z DataFrame pandas lub zwracania znacznika czasu z UDF pandas. Te konwersje są wykonywane automatycznie, aby upewnić się, że platforma Spark ma dane w oczekiwanym formacie, więc nie jest konieczne samodzielne wykonanie żadnej z tych konwersji. Wszelkie wartości nanosekund są obcinane.

Standardowa funkcja UDF ładuje dane sygnatury czasowej jako obiekty daty/godziny języka Python, które różnią się od sygnatury czasowej biblioteki pandas. Aby uzyskać najlepszą wydajność, zalecamy używanie funkcji szeregów czasowych biblioteki pandas w pracy ze znacznikami czasu w funkcji zdefiniowanej przez użytkownika (UDF) biblioteki pandas. Aby uzyskać szczegółowe informacje, zobacz Funkcje szeregów czasowych/dat.

Przykładowy notatnik

W poniższym notebooku przedstawiono ulepszenia wydajności, które można osiągnąć za pomocą funkcji Pandas UDF.

Notatnik testów porównawczych funkcji zdefiniowanych przez użytkownika w pandas

Pobierz notatnik