Funkce pandas definované uživatelem
Uživatelem definovaná funkce pandas (UDF) (označovaná také jako vektorizovaná funkce definovaná uživatelem) je uživatelem definovaná funkce, která k přenosu dat a knihovny pandas používá Apache Arrow k práci s daty. Funkce definované uživatelem pandas umožňují vektorizované operace, které můžou zvýšit výkon až o 100x v porovnání s uživatelem definovanými uživatelem Pythonu v řádcích a časem.
Základní informace najdete v blogovém příspěvku New Pandas UDF a Python Type Hints v nadcházející verzi Apache Sparku 3.0.
Pomocí klíčového slova pandas_udf
jako dekorátoru definujete UDF knihovny pandas a funkci zabalíte pomocí nápovědy k typu Pythonu.
Tento článek popisuje různé typy funkcí definovaných uživatelem v knihovně pandas a ukazuje, jak používat uživatelem definované funkce pandas s nápovědou k typům.
Řada na řadu UDF
K vektorizaci skalárních operací použijete sadu Series to Series to Series pandas UDF.
Můžete je použít s rozhraními API, jako select
jsou a withColumn
.
Funkce Pythonu by měla jako vstup použít řadu pandas a vrátit sadu pandas Series se stejnou délkou a měli byste ji zadat v nápovědě k typu Pythonu. Spark spustí UDF pandas rozdělením sloupců do dávek, voláním funkce pro každou dávku jako podmnožinou dat a následným zřetězením výsledků.
Následující příklad ukazuje, jak vytvořit UDF pandas, který vypočítá součin 2 sloupců.
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
Iterátor řady iterátoru řady UDF
Funkce definovaná uživatelem iterátoru je stejná jako skalární knihovna UDF pandas s výjimkou:
- Funkce Pythonu
- Vezme jako vstup iterátor dávek místo jedné vstupní dávky.
- Vrátí iterátor výstupních dávek místo jedné výstupní dávky.
- Délka celého výstupu v iterátoru by měla být stejná jako délka celého vstupu.
- Zabalená uživatelská funkce pandas přebírá jako vstup jeden sloupec Sparku.
Měli byste zadat nápovědu pro typ Pythonu jako Iterator[pandas.Series]
->Iterator[pandas.Series]
.
Tato funkce UDF knihovny pandas je užitečná, když spuštění UDF vyžaduje inicializaci některého stavu, například načtení souboru modelu strojového učení, aby se použil odvozování pro každou vstupní dávku.
Následující příklad ukazuje, jak vytvořit UDF pandas s podporou iterátoru.
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1
df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)
@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
y = y_bc.value # initialize states
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any
df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+
Iterátor více řad iterátoru UDF řady
Iterátor více řad iterátoru iterátoru UDF řady má podobné vlastnosti a omezení jako Iterátor řad iterátoru řady UDF. Zadaná funkce přebírá iterátor dávek a vypíše iterátor dávek. Je také užitečné, když spuštění UDF vyžaduje inicializaci nějakého stavu.
Rozdíly jsou:
- Základní funkce Pythonu přebírá iterátor řazené kolekce členů knihovny pandas Series.
- Zabalená funkce definovaná uživatelem pandas přebírá jako vstup několik sloupců Sparku.
Nápovědu typu zadáte jako Iterator[Tuple[pandas.Series, ...]]
->Iterator[pandas.Series]
.
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
Řady pro skalární definovanou uživatelem
UDF skalární knihovny pandas se podobají agregačním funkcím Sparku.
A Series to scalar pandas UDF definuje agregaci z jedné nebo více řad pandas na skalární hodnotu, kde každá řada pandas představuje sloupec Spark.
Pomocí řady můžete skalar pandas UDF používat rozhraní API, jako select
jsou , withColumn
groupBy.agg
, a pyspark.sql.Window.
Tip typu vyjadřujete jako pandas.Series, ...
->Any
. Návratový typ by měl být primitivním datovým typem a vrácený skalár může být primitivním typem Pythonu, například nebo float
datovým typem NumPy, int
například numpy.int64
nebo numpy.float64
. Any
v ideálním případě by měl být konkrétním skalárním typem.
Tento typ UDF nepodporuje částečnou agregaci a všechna data pro každou skupinu se načtou do paměti.
Následující příklad ukazuje, jak tento typ UDF použít k výpočtu průměru s select
, groupBy
a window
operace:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
Podrobné informace o využití najdete v tématu pyspark.sql.functions.pandas_udf.
Využití
Nastavení velikosti dávky šipky
Poznámka:
Tato konfigurace nemá žádný vliv na výpočetní výkon nakonfigurovaný pomocí režimu sdíleného přístupu a databricks Runtime 13.3 LTS až 14.2.
Datové oddíly ve Sparku se převedou na dávky záznamů se šipkami, což může dočasně vést k vysokému využití paměti v prostředí JVM. Abyste se vyhnuli možným výjimkám z paměti, můžete upravit velikost dávek se šipkami tak, že nastavíte spark.sql.execution.arrow.maxRecordsPerBatch
konfiguraci na celé číslo, které určuje maximální počet řádků pro každou dávku. Výchozí hodnota je 10 000 záznamů na dávku. Pokud je počet sloupců velký, měla by se hodnota odpovídajícím způsobem upravit. Při použití tohoto limitu se každý datový oddíl rozdělí na 1 nebo více dávek záznamů pro zpracování.
Časové razítko sémantikou časového pásma
Spark interně ukládá časová razítka jako hodnoty UTC a data časového razítka přenesená bez zadaného časového pásma se převedou jako místní čas na UTC s rozlišením mikrosekund.
Při exportu nebo zobrazení dat časového razítka ve Sparku se časové pásmo relace používá k lokalizaci hodnot časového razítka. Časové pásmo relace je nastaveno s spark.sql.session.timeZone
konfigurací a ve výchozím nastavení se nastaví místní časové pásmo systému JVM. Pandas používá datetime64
typ s nanosekundovým rozlišením , datetime64[ns]
s volitelným časovým pásmem na základě sloupce.
Při přenosu dat časového razítka ze Sparku do pandas se převedou na nanosekundy a každý sloupec se převede do časového pásma relace Sparku a pak se lokalizuje do tohoto časového pásma, které odebere časové pásmo a zobrazí hodnoty jako místní čas. K tomu dochází při volání toPandas()
nebo pandas_udf
se sloupci časového razítka.
Při přenosu dat časového razítka z pandas do Sparku se převedou na mikrosekundy UTC. K tomu dochází při volání createDataFrame
s datovým rámcem pandas nebo při vrácení časového razítka z uživatelem definovaného uživatelem pandas. Tyto převody se provádějí automaticky, aby Spark měl data v očekávaném formátu, takže není nutné provádět žádné z těchto převodů sami. Všechny nanosekundové hodnoty jsou zkráceny.
Standardní UDF načte data časového razítka jako objekty data a času Pythonu, které se liší od časového razítka pandas. Pokud chcete dosáhnout nejlepšího výkonu, doporučujeme při práci s časovými razítky v UDF knihovny pandas používat funkci časové řady pandas. Podrobnosti najdete v tématu Funkce časové řady a data.
Příklad poznámkového bloku
Následující poznámkový blok znázorňuje vylepšení výkonu, která můžete dosáhnout pomocí uživatelem definovaných funkcí pandas: