Megosztás a következőn keresztül:


a pandas felhasználó által megadott függvényei

A pandas felhasználó által definiált függvény (UDF) – más néven vektoros UDF – egy felhasználó által definiált függvény, amely az Apache Arrow használatával továbbítja az adatokat, a pandas pedig az adatokkal dolgozik. A pandas UDF-ek olyan vektoros műveleteket engedélyeznek, amelyek akár 100-szor is növelhetik a teljesítményt az egymás utáni Python UDF-ekhez képest.

A háttérinformációkat az Apache Spark 3.0 közelgő kiadásában, a New Pandas UDFs és a Python Type Hints című blogbejegyzésben találhatja meg.

Definiálhat egy pandas UDF-et a kulcsszóval pandas_udf dekorátorként, és a függvényt Python-típusmutatóval burkolhatja. Ez a cikk a pandas UDF-ek különböző típusait ismerteti, és bemutatja, hogyan használhatók a pandas UDF-ek típustippekkel.

Sorozat–sorozat UDF

A skaláris műveletek vektorizálásához használ egy Series to Series pandas UDF-t. Használhatja őket api-kkal, például select és withColumn.

A Python-függvénynek be kell vennie egy pandas Series-et bemenetként, és ugyanolyan hosszúságú pandas-sorozatot kell visszaadnia, és ezeket meg kell adnia a Python-típus tippjeiben. A Spark egy pandas UDF-et futtat úgy, hogy az oszlopokat kötegekre osztja fel, és meghívja az egyes kötegek függvényét az adatok részhalmazaként, majd összefűzve az eredményeket.

Az alábbi példa bemutatja, hogyan hozhat létre egy pandas UDF-t, amely 2 oszlop szorzatát számítja ki.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

Sorozat iterátora az UDF sorozat iterátorához

Az iterátor UDF ugyanaz, mint a skaláris pandas UDF, kivéve:

  • A Python függvény
    • A kötegek iterátorát használja egyetlen bemeneti köteg helyett bemenetként.
    • A kimeneti kötegek iterátorát adja vissza egyetlen kimeneti köteg helyett.
  • Az iterátor teljes kimenetének hosszának meg kell egyeznie a teljes bemenet hosszával.
  • A burkolt pandas UDF bemenetként egyetlen Spark-oszlopot használ.

A Python-típusmutatót a következőként kell megadnia Iterator[pandas.Series] : ->Iterator[pandas.Series].

Ez a pandas UDF akkor hasznos, ha az UDF végrehajtásához valamilyen állapot inicializálására van szükség, például egy gépi tanulási modellfájl betöltését, hogy következtetést alkalmazzon minden bemeneti kötegre.

Az alábbi példa bemutatja, hogyan hozhat létre pandas UDF-et iterátortámogatással.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

Több sorozat iterátora az UDF sorozat iterátorához

Az UDF sorozatok iterátorának iterátora hasonló jellemzőkkel és korlátozásokkal rendelkezik, mint az UDF sorozat iterátorához. A megadott függvény a kötegek iterátorát használja, és a kötegek iterátorát adja ki. Akkor is hasznos, ha az UDF-végrehajtáshoz valamilyen állapot inicializálása szükséges.

A különbségek a következők:

  • A mögöttes Python-függvény a pandas Series egy rekordjának iterátorát használja.
  • A burkolt pandas UDF bemenetként több Spark-oszlopot is használ.

A típusmutatókat a következőként adja meg: Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

Adatsor a skaláris UDF-hez

A skaláris pandas UDF-jei hasonlóak a Spark összesítő függvényeihez. A skaláris pandas UDF-hez készült sorozat egy vagy több pandas-sorozatból egy skaláris értékre vonatkozó összesítést határoz meg, ahol minden pandas Series egy Spark-oszlopot jelöl. Sorozat használatával skaláris pandas UDF-eket használhat api-kkal, például select: , withColumngroupBy.agg, és pyspark.sql.Window.

A típusmutatót a következőként fejezi ki: pandas.Series, ... ->Any. A visszatérési típusnak primitív adattípusnak kell lennie, és a visszaadott skalár lehet például Python-primitív típus, int vagy float NumPy adattípus, például numpy.int64 vagy numpy.float64. Any ideális esetben egy adott skaláris típusnak kell lennie.

Az ilyen típusú UDF nem támogatja a részleges összesítést, és az egyes csoportok összes adata betöltődik a memóriába.

Az alábbi példa bemutatja, hogyan használhatja az ilyen típusú UDF-t a középérték kiszámításához a , groupByés window a műveletek használatávalselect:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

A részletes használatért lásd: pyspark.sql.functions.pandas_udf.

Használat

Nyíl kötegméretének beállítása

Feljegyzés

Ez a konfiguráció nincs hatással a megosztott hozzáférési móddal és a Databricks Runtime 13.3 LTS 14.2-vel konfigurált számításra.

A Spark adatpartíciói nyílrekord-kötegekké alakulnak, ami ideiglenesen magas memóriahasználathoz vezethet a JVM-ben. A memóriakivételek elkerülése érdekében módosíthatja a Nyílrekord kötegek méretét úgy, hogy a spark.sql.execution.arrow.maxRecordsPerBatch konfigurációt egy egész számra állítja, amely meghatározza az egyes kötegek sorainak maximális számát. Az alapértelmezett érték kötegenként 10 000 rekord. Ha az oszlopok száma nagy, az értéket ennek megfelelően kell módosítani. Ennek a korlátnak a használatával minden adatpartíció 1 vagy több rekordkötegre van osztva feldolgozásra.

Időbélyeg időzóna szemantikával

A Spark belsőleg UTC-értékként tárolja az időbélyegeket, és a megadott időzóna nélkül behozott időbélyegek helyi időként lesznek utc-zé alakítva mikroszekundumos felbontással.

Amikor az időbélyeg-adatok exportálása vagy megjelenítése a Sparkban történik, a munkamenet időzónája az időbélyeg értékeinek honosítására szolgál. A munkamenet időzónája a spark.sql.session.timeZone konfigurációval van beállítva, és alapértelmezés szerint a JVM-rendszer helyi időzónájába van beállítva. A pandas nanoszekundumos felbontással rendelkező típust datetime64 használ, datetime64[ns]oszloponként választható időzónával.

Amikor az időbélyeg-adatokat a Sparkból a pandasba továbbítja, azokat nanoszekundumokká alakítja át, és minden oszlopot a Spark-munkamenet időzónává alakít, majd erre az időzónára honosít, amely eltávolítja az időzónát, és helyi időként jeleníti meg az értékeket. Ez híváskor toPandas() vagy pandas_udf időbélyegoszlopok használatakor fordul elő.

Amikor az időbélyeg-adatok a pandasból a Sparkba kerülnek át, a rendszer UTC mikroszekundummá alakítja az adatokat. Ez akkor fordul elő, ha pandas DataFrame-et hív createDataFrame , vagy amikor egy pandas UDF-ből ad vissza időbélyeget. Ezek a konvertálások automatikusan befejeződnek, hogy a Spark a várt formátumban rendelkezzen adatokkal, ezért ezeket a konvertálásokat nem szükséges saját maga elvégeznie. Minden nanoszekundumérték csonkolt.

A standard UDF időbélyegadatokat tölt be Python datetime objektumként, ami eltér a pandas időbélyegzőjénél. A legjobb teljesítmény érdekében azt javasoljuk, hogy a pandas idősorozat-funkcióit használja a pandas UDF időbélyegeinek használatakor. További részletekért lásd : Idősor/ Dátum funkció.

Példajegyzetfüzet

Az alábbi jegyzetfüzet a pandas UDF-ekkel elérhető teljesítménybeli fejlesztéseket mutatja be:

pandas UDFs benchmark notebook

Jegyzetfüzet beszerzése