Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Una funzione definita dall'utente pandas (UDF)—nota anche come UDF vettoriale—è una funzione definita dall'utente che usa Apache Arrow per trasferire dati e utilizza pandas per l'elaborazione dei dati. Le UDF pandas consentono operazioni vettoriali che possono aumentare le prestazioni fino a 100 volte rispetto alle UDF Python riga per riga.
Per informazioni di base, vedere il post del blog Nuove UDF pandas e suggerimenti per i tipi Python nella prossima versione di Apache Spark 3.0.
È possibile definire una pandas UDF usando la parola chiave pandas_udf come decoratore e incapsulare la funzione con un type hint Python.
Questo articolo descrive i diversi tipi di UDF pandas e illustra come utilizzare gli UDF pandas con indicazioni di tipo.
UDF da serie a serie
Per vettorizzare le operazioni scalari, usare una UDF pandas da serie a serie.
È possibile utilizzarle con API come select e withColumn.
La funzione Python deve accettare una serie pandas come input e restituire una serie pandas con la stessa lunghezza. Specificare questi tipi usando annotazioni di tipo in Python. Spark esegue un pandas UDF suddividendo i dati in batch di righe, chiamando la funzione per ogni batch e quindi concatenando i risultati.
L'esempio seguente illustra come creare una funzione definita dall'utente (UDF) di pandas che calcola il prodotto di due colonne.
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
UDF da iteratore di serie a iteratore di serie
Una UDF iteratore è identica a una UDF pandas scalare, fatta eccezione per:
- La funzione Python
- Accetta un iteratore di batch come input, anziché un singolo batch di input.
- Restituisce un iteratore di batch di output anziché un singolo batch di output.
- La lunghezza dell'intero output nell'iteratore deve corrispondere alla lunghezza dell'intero input.
- La funzione pandas UDF incapsulata accetta una singola colonna Spark come input.
È necessario specificare l'hint per il tipo Python come Iterator[pandas.Series] ->Iterator[pandas.Series].
Questa pandas UDF è utile quando l'esecuzione della UDF richiede l'inizializzazione di uno stato, ad esempio caricamento di un file del modello di apprendimento automatico per applicare l'inferenza su ogni batch di input.
L'esempio seguente illustra come creare una pandas UDF con supporto per iteratore.
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1
df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)
@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
y = y_bc.value # initialize states
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any
df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+
UDF da iteratore di più serie a iteratore di serie
Una UDF da iteratore di più serie a iteratore di serie presenta caratteristiche e restrizioni simili alla UDF da iteratore di serie a iteratore di serie. La funzione specificata accetta un iteratore di batch e restituisce un iteratore di batch. È utile anche quando l'esecuzione di una UDF richiede l'inizializzazione di uno stato interno.
Le differenze sono le seguenti:
- La funzione Python sottostante accetta un iteratore di una tupla di serie pandas.
- La funzione definita dall'utente pandas di cui è stato eseguito il wrapping accetta più colonne spark come input.
Specificare gli hint di tipo come Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series].
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
UDF da serie a scalare
Le Funzioni Definite dall'Utente (UDF) pandas, che convertono da serie a scalare, sono simili alle funzioni di aggregazione di Spark.
Una funzione definita dall'utente di tipo Series a scalare in pandas definisce un'aggregazione da una o più serie pandas a un valore scalare, in cui ogni serie pandas rappresenta una colonna Spark.
Si usa una serie per scalare la funzione definita dall'utente pandas con API come select, withColumn, groupBy.agge pyspark.sql.Window.
Si esprime l'hint di tipo come pandas.Series, ... ->Any. Il tipo restituito deve essere un tipo di dati primitivo e lo scalare restituito può essere un tipo primitivo Python, ad esempio int o float o un tipo di dati NumPy, ad esempio numpy.int64 o numpy.float64.
Any deve essere idealmente un tipo scalare specifico.
Questo tipo di UDF non supporta l'aggregazione parziale e tutti i dati per ogni gruppo vengono caricati in memoria.
L'esempio seguente mostra come usare questo tipo di UDF per calcolare la media con le operazioni select, groupBy e window:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.functions.pandas_udf.
Utilizzo
Impostazione delle dimensioni del batch Arrow
Nota
Questa configurazione non ha alcun impatto sul calcolo configurato con la modalità di accesso standard e Databricks Runtime da 13.3 LTS a 14.2.
Le partizioni di dati in Spark vengono convertite in batch di record Arrow, che possono causare temporaneamente un utilizzo elevato della memoria nella JVM. Per evitare possibili eccezioni di memoria insufficiente, è possibile modificare le dimensioni dei batch di record Arrow impostando la configurazione spark.sql.execution.arrow.maxRecordsPerBatch su un numero intero che determina il numero massimo di righe per ogni batch. Il valore predefinito è 10.000 record per batch. Se il numero di colonne è elevato, il valore deve essere regolato di conseguenza. Usando questo limite, ogni partizione di dati viene suddivisa in 1 o più batch di record per l'elaborazione.
Timestamp con semantica del fuso orario
Spark archivia internamente i timestamp come valori UTC e i dati di timestamp inseriti senza un fuso orario specificato vengono convertiti come ora locale in utc con risoluzione microseconda.
Quando i dati di timestamp vengono esportati o visualizzati in Spark, il fuso orario della sessione viene usato per localizzare i valori del timestamp. Il fuso orario della sessione viene impostato con la configurazione spark.sql.session.timeZone e, se non specificato diversamente, viene utilizzato il fuso orario locale del sistema JVM per impostazione predefinita. Pandas usa un tipo di datetime64 con risoluzione in nanosecondi, datetime64[ns], con fuso orario facoltativo per ogni colonna.
Quando i dati timestamp vengono trasferiti da Spark a pandas, vengono convertiti in nanosecondi e ogni colonna viene convertita nel fuso orario della sessione Spark, quindi localizzata in tale fuso orario, che rimuove il fuso orario e visualizza i valori come ora locale. Ciò si verifica quando si chiama toPandas() o pandas_udf con colonne di timestamp.
Quando i dati timestamp vengono trasferiti da pandas a Spark, vengono convertiti in microsecondi UTC. Ciò si verifica quando si chiama createDataFrame con un dataframe pandas o quando si restituisce un timestamp da una UDF pandas. Queste conversioni vengono eseguite automaticamente per garantire che Spark abbia dati nel formato previsto, quindi non è necessario eseguire manualmente alcuna di queste conversioni. I valori nanosecondi vengono troncati.
Una funzione definita dall'utente standard carica i dati di timestamp come oggetti datetime di Python, che sono diversi dai timestamp di pandas. Per ottenere prestazioni ottimali, è consigliabile usare le funzionalità per le serie temporali di pandas quando si usano i timestamp in una UDF di pandas. Per informazioni dettagliate, vedere Funzionalità serie temporale/data.
Notebook di esempio
Il seguente notebook illustra i miglioramenti delle prestazioni che è possibile ottenere con le pandas UDF.
Notebook benchmark delle UDF pandas
Ottieni taccuino