door gebruikers gedefinieerde Pandas-functies
Een door de gebruiker gedefinieerde pandas-functie (UDF) is een door de gebruiker gedefinieerde functie die gebruikmaakt van Apache Arrow om gegevens en pandas over te dragen om met de gegevens te werken. Pandas UDF's maken vectorized bewerkingen mogelijk die de prestaties tot 100x kunnen verhogen in vergelijking met Python UDF's met rij-at-a-time.
Zie het blogbericht New Pandas UDF's en Python Type Hints in de aanstaande release van Apache Spark 3.0 voor achtergrondinformatie.
U definieert een pandas UDF met behulp van het trefwoord pandas_udf
als decorator en verpakt de functie met een Python-typehint.
In dit artikel worden de verschillende typen pandas UDF's beschreven en wordt beschreven hoe u pandas UDF's gebruikt met typehints.
Reeks-naar-serie-UDF
U gebruikt een reeks voor reeks pandas UDF om scalaire bewerkingen te vectoriseren.
U kunt ze gebruiken met API's zoals select
en withColumn
.
De Python-functie moet een pandas-serie als invoer nemen en een pandas-reeks van dezelfde lengte retourneren. U moet deze opgeven in de Hints van het Python-type. Spark voert een pandas UDF uit door kolommen te splitsen in batches, waarbij de functie voor elke batch wordt aangeroepen als een subset van de gegevens en vervolgens de resultaten samenvoegt.
In het volgende voorbeeld ziet u hoe u een pandas UDF maakt waarmee het product van 2 kolommen wordt berekend.
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
Iterator van reeks naar Iterator van reeks UDF
Een iterator UDF is hetzelfde als een scalaire pandas UDF, behalve:
- De Python-functie
- Neemt een iterator van batches in plaats van één invoerbatch als invoer.
- Retourneert een iterator van uitvoerbatches in plaats van één uitvoerbatch.
- De lengte van de volledige uitvoer in de iterator moet gelijk zijn aan de lengte van de gehele invoer.
- De ingepakte pandas UDF neemt één Spark-kolom als invoer.
U moet de hint voor het Python-type opgeven als Iterator[pandas.Series]
->Iterator[pandas.Series]
.
Deze pandas UDF is handig wanneer de UDF-uitvoering een bepaalde status moet initialiseren, bijvoorbeeld het laden van een machine learning-modelbestand om deductie toe te passen op elke invoerbatch.
In het volgende voorbeeld ziet u hoe u een pandas UDF maakt met iterator-ondersteuning.
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1
df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)
@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
y = y_bc.value # initialize states
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any
df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+
Iterator van meerdere series naar Iterator of Series UDF
Een Iterator van meerdere reeksen naar Iterator van reeks UDF heeft vergelijkbare kenmerken en beperkingen als Iterator van serie naar Iterator of Series UDF. De opgegeven functie gebruikt een iterator van batches en voert een iterator van batches uit. Het is ook handig wanneer de UDF-uitvoering een bepaalde status moet initialiseren.
De verschillen zijn:
- De onderliggende Python-functie gebruikt een iterator van een tuple van pandas Series.
- De verpakte pandas UDF neemt meerdere Spark-kolommen als invoer.
U geeft de typehints op als Iterator[Tuple[pandas.Series, ...]]
->Iterator[pandas.Series]
.
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
Reeks naar scalaire UDF
Reeksen naar scalaire pandas UDF's zijn vergelijkbaar met statistische Spark-functies.
Een reeks naar scalaire pandas UDF definieert een aggregatie van een of meer pandas-serie naar een scalaire waarde, waarbij elke pandas-serie een Spark-kolom vertegenwoordigt.
U gebruikt een reeks voor scalaire pandas UDF met API's zoals select
, withColumn
groupBy.agg
en pyspark.sql.Window.
U drukt de typehint uit als pandas.Series, ...
->Any
. Het retourtype moet een primitief gegevenstype zijn en de geretourneerde scalaire waarde kan een primitief Python-type zijn, bijvoorbeeld int
of float
een NumPy-gegevenstype, zoals numpy.int64
of numpy.float64
. Any
moet idealiter een specifiek scalaire type zijn.
Dit type UDF biedt geen ondersteuning voor gedeeltelijke aggregatie en alle gegevens voor elke groep worden in het geheugen geladen.
In het volgende voorbeeld ziet u hoe u dit type UDF gebruikt om het gemiddelde te berekenen met select
, groupBy
en window
bewerkingen:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
Zie pyspark.sql.functions.pandas_udf voor gedetailleerd gebruik.
Gebruik
Grootte van pijlbatch instellen
Notitie
Deze configuratie heeft geen invloed op berekeningen die zijn geconfigureerd met de modus voor gedeelde toegang en Databricks Runtime 13.3 LTS tot en met 14.2.
Gegevenspartities in Spark worden geconverteerd naar arrowrecordbatches, wat tijdelijk kan leiden tot een hoog geheugengebruik in de JVM. Als u mogelijke uitzonderingen voor onvoldoende geheugen wilt voorkomen, kunt u de grootte van de pijlrecordbatches aanpassen door de spark.sql.execution.arrow.maxRecordsPerBatch
configuratie in te stellen op een geheel getal dat het maximum aantal rijen voor elke batch bepaalt. De standaardwaarde is 10.000 records per batch. Als het aantal kolommen groot is, moet de waarde dienovereenkomstig worden aangepast. Met deze limiet wordt elke gegevenspartitie onderverdeeld in 1 of meer recordbatches voor verwerking.
Tijdstempel met tijdzone-semantiek
Spark slaat tijdstempels intern op als UTC-waarden en tijdstempelgegevens die zonder een opgegeven tijdzone worden binnengebracht, worden geconverteerd als lokale tijd naar UTC met microsecondenresolutie.
Wanneer tijdstempelgegevens worden geëxporteerd of weergegeven in Spark, wordt de sessietijdzone gebruikt om de tijdstempelwaarden te lokaliseren. De sessietijdzone wordt ingesteld met de spark.sql.session.timeZone
configuratie en wordt standaard ingesteld op de lokale tijdzone van het JVM-systeem. Pandas gebruikt een datetime64
type met nanoseconde resolutie, datetime64[ns]
met optionele tijdzone per kolom.
Wanneer tijdstempelgegevens worden overgebracht van Spark naar pandas, worden deze geconverteerd naar nanoseconden en wordt elke kolom geconverteerd naar de Spark-sessietijdzone en vervolgens gelokaliseerd naar die tijdzone, waardoor de tijdzone wordt verwijderd en waarden worden weergegeven als lokale tijd. Dit gebeurt wanneer u aanroept toPandas()
of pandas_udf
met tijdstempelkolommen.
Wanneer tijdstempelgegevens worden overgedragen van pandas naar Spark, worden deze geconverteerd naar UTC-microseconden. Dit gebeurt bij het aanroepen createDataFrame
met een Pandas DataFrame of bij het retourneren van een tijdstempel van een pandas UDF. Deze conversies worden automatisch uitgevoerd om ervoor te zorgen dat Spark gegevens in de verwachte indeling heeft, dus het is niet nodig om zelf een van deze conversies uit te voeren. Alle nanosecondewaarden worden afgekapt.
Een standaard-UDF laadt tijdstempelgegevens als Python-datum/tijd-objecten, wat anders is dan een pandas-tijdstempel. Om de beste prestaties te krijgen, raden we u aan om pandas-tijdreeksfunctionaliteit te gebruiken bij het werken met tijdstempels in een pandas UDF. Zie de functionaliteit Time Series/Datum voor meer informatie.
Voorbeeld van notebook
Het volgende notebook illustreert de prestatieverbeteringen die u kunt bereiken met pandas UDF's: