Condividi tramite


Funzioni Pandas definite dall'utente

Una funzione definita dall'utente pandas, nota anche come funzione definita dall'utente vettorializzata, è una funzione definita dall'utente che usa Apache Arrow per trasferire dati e pandas per lavorare con i dati. Le funzioni definite dall'utente pandas consentono operazioni vettorializzate che possono aumentare le prestazioni fino a 100 volte rispetto alle funzioni definite dall'utente Python in fase di esecuzione.

Per informazioni di base, vedere il post di blog New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0 (Nuove funzioni definite dall'utente Pandas e hint per i tipi Python nella prossima versione di Apache Spark 3.0).

È possibile definire una funzione definita dall'utente pandas usando la parola chiave pandas_udf come elemento Decorator ed eseguire il wrapping della funzione con un hint per il tipo Python. Questo articolo descrive i diversi tipi di funzioni definite dall'utente pandas e illustra come usare le funzioni definite dall'utente pandas con hint di tipo.

Funzione definita dall'utente da serie a serie

Per vettorizzare le operazioni scalari, usare una funzione definita dall'utente pandas series to Series to Series. È possibile usarli con API come select e withColumn.

La funzione Python deve accettare una serie pandas come input e restituire una serie pandas con la stessa lunghezza ed è necessario specificarli negli hint per il tipo Python. Spark esegue una funzione definita dall'utente pandas suddividendo le colonne in batch, chiamando la funzione per ogni batch come subset dei dati, quindi concatenando i risultati.

L'esempio seguente illustra come creare una funzione definita dall'utente pandas che calcola il prodotto di 2 colonne.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

Iteratore di serie in iteratore della funzione definita dall'utente della serie

Una funzione definita dall'utente iteratore è identica a una funzione definita dall'utente pandas scalare, ad eccezione di:

  • Funzione Python
    • Accetta un iteratore di batch anziché un singolo batch di input come input.
    • Restituisce un iteratore di batch di output anziché un singolo batch di output.
  • La lunghezza dell'intero output nell'iteratore deve corrispondere alla lunghezza dell'intero input.
  • La funzione definita dall'utente pandas di cui è stato eseguito il wrapping accetta una singola colonna Spark come input.

È necessario specificare l'hint per il tipo Python come Iterator[pandas.Series] ->Iterator[pandas.Series].

Questa funzione definita dall'utente pandas è utile quando l'esecuzione della funzione definita dall'utente richiede l'inizializzazione di uno stato, ad esempio il caricamento di un file del modello di Machine Learning per applicare l'inferenza a ogni batch di input.

L'esempio seguente illustra come creare una funzione definita dall'utente pandas con supporto iteratore.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

Iteratore di più serie in iteratore della funzione definita dall'utente della serie

Un iteratore di più serie per iteratore di funzioni definite dall'utente della serie presenta caratteristiche e restrizioni simili come iteratore di serie a iteratore di funzioni definite dall'utente della serie. La funzione specificata accetta un iteratore di batch e restituisce un iteratore di batch. È utile anche quando l'esecuzione della funzione definita dall'utente richiede l'inizializzazione di uno stato.

Le differenze sono le seguenti:

  • La funzione Python sottostante accetta un iteratore di una tupla di serie pandas.
  • La funzione definita dall'utente pandas di cui è stato eseguito il wrapping accetta più colonne Spark come input.

Specificare gli hint di tipo come Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

Da serie a funzioni definite dall'utente scalari

Le funzioni di aggregazione da serie a pandas scalari sono simili alle funzioni di aggregazione Spark. Una funzione definita dall'utente da serie a pandas scalare definisce un'aggregazione da una o più serie pandas a un valore scalare, in cui ogni serie pandas rappresenta una colonna Spark. Usare una serie per scalare la funzione definita dall'utente pandas con API come select, withColumn, groupBy.agge pyspark.sql.Window.

Si esprime l'hint di tipo come pandas.Series, ... ->Any. Il tipo restituito deve essere un tipo di dati primitivo e il scalare restituito può essere un tipo primitivo Python, ad esempio o float un tipo di dati NumPy, ad numpy.int64 esempio int o numpy.float64. Any idealmente deve essere un tipo scalare specifico.

Questo tipo di funzione definita dall'utente non supporta l'aggregazione parziale e tutti i dati per ogni gruppo vengono caricati in memoria.

L'esempio seguente illustra come usare questo tipo di funzione definita dall'utente per calcolare la media con selectle operazioni , groupBye window :

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.functions.pandas_udf.

Utilizzo

Impostazione delle dimensioni del batch freccia

Nota

Questa configurazione non ha alcun impatto sul calcolo configurato con la modalità di accesso condiviso e Databricks Runtime da 13.3 LTS a 14.2.

Le partizioni di dati in Spark vengono convertite in batch di record Arrow, che possono causare temporaneamente un utilizzo elevato della memoria nella JVM. Per evitare possibili eccezioni di memoria insufficiente, è possibile modificare le dimensioni dei batch di record Freccia impostando la spark.sql.execution.arrow.maxRecordsPerBatch configurazione su un numero intero che determina il numero massimo di righe per ogni batch. Il valore predefinito è 10.000 record per batch. Se il numero di colonne è elevato, il valore deve essere regolato di conseguenza. Usando questo limite, ogni partizione di dati viene suddivisa in 1 o più batch di record per l'elaborazione.

Timestamp con semantica del fuso orario

Spark archivia internamente i timestamp come valori UTC e i dati di timestamp inseriti senza un fuso orario specificato vengono convertiti come ora locale in utc con risoluzione microseconda.

Quando i dati di timestamp vengono esportati o visualizzati in Spark, il fuso orario della sessione viene usato per localizzare i valori del timestamp. Il fuso orario della sessione viene impostato con la spark.sql.session.timeZone configurazione e per impostazione predefinita viene impostato il fuso orario locale del sistema JVM. Pandas usa un datetime64 tipo con risoluzione nanosecondo, datetime64[ns], con fuso orario facoltativo per colonna.

Quando i dati timestamp vengono trasferiti da Spark a pandas, vengono convertiti in nanosecondi e ogni colonna viene convertita nel fuso orario della sessione Spark, quindi localizzata in tale fuso orario, che rimuove il fuso orario e visualizza i valori come ora locale. Ciò si verifica quando si chiama toPandas() o pandas_udf con colonne timestamp.

Quando i dati timestamp vengono trasferiti da pandas a Spark, vengono convertiti in microsecondi UTC. Ciò si verifica quando si chiama createDataFrame con un dataframe pandas o quando si restituisce un timestamp da una funzione definita dall'utente pandas. Queste conversioni vengono eseguite automaticamente per garantire che Spark abbia dati nel formato previsto, quindi non è necessario eseguire manualmente alcuna di queste conversioni. I valori nanosecondi vengono troncati.

Una funzione definita dall'utente standard carica i dati di timestamp come oggetti datetime Python, che è diverso da un timestamp pandas. Per ottenere prestazioni ottimali, è consigliabile usare la funzionalità della serie temporale pandas quando si usano i timestamp in una funzione definita dall'utente pandas. Per informazioni dettagliate, vedere Funzionalità Time Series/Date.

Notebook di esempio

Il notebook seguente illustra i miglioramenti delle prestazioni che è possibile ottenere con le funzioni definite dall'utente pandas:

Notebook benchmark delle funzioni definite dall'utente pandas

Ottenere il notebook