Fonctions pandas définies par l’utilisateur

Une fonction définie par l’utilisateur (UDF) pandas, également appelée UDF vectorisée, est une fonction définie par l’utilisateur qui utilise Apache Arrow pour transférer des données et des fonctions pandas pour utiliser les données. Les fonctions définies par l’utilisateur pandas permettent d’effectuer des opérations vectorisées pouvant augmenter les performances jusqu’à 100 x par rapport aux Fonctions définies par l’utilisateur Python qui opèrent une ligne à la fois.

Pour des informations d’arrière-plan, consultez le billet de blog Nouvelles fonctions Pandas définies par l’utilisateur et indicateurs de type Python dans la prochaine version d’Apache Spark 3.0.

Vous définissez une fonction définie par l’utilisateur pandas en utilisant le mot clé pandas_udf en tant qu’élément décoratif, et enveloppez la fonction avec un indicateur de type Python. Cet article décrit les différents types de fonctions définies par l’utilisateur pandas, et montre comment les utiliser avec des indicateurs de type.

Fonction définie par l'utilisateur de série à série

Vous utilisez une fonction définie par l'utilisateur pandas de série à série pour vectoriser des opérations scalaires. Vous pouvez les utiliser avec des API telles que select et withColumn.

La fonction Python doit prendre une série pandas comme entrée et retourner une série pandas de la même longueur, et vous devez les spécifier dans les indicateurs de type Python. Spark exécute une fonction définie par l'utilisateur pandas en fractionnant les colonnes en lots, en appelant la fonction pour chaque lot sous la forme d’un sous-ensemble de données, puis en concaténant les résultats.

L’exemple suivant montre comment créer une fonction définie par l'utilisateur pandas qui calcule le produit de 2 colonnes.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

Fonction définie par l'utilisateur d’itérateur de série à itérateur de série

Une fonction définie par l'utilisateur d’itérateur est identique à une fonction définie par l'utilisateur pandas scalaire, à l’exception des considérations suivantes :

  • La fonction Python
    • Prend un itérateur de lots au lieu d’un lot d’entrée unique comme entrée.
    • Retourne un itérateur de lots de sortie au lieu d’un seul lot de sortie.
  • La longueur de l’intégralité de la sortie dans l’itérateur doit être identique à celle de l’entrée entière.
  • La fonction définie par l'utilisateur pandas enveloppée utilise une seule colonne Spark en tant qu’entrée.

Vous devez spécifier l’indicateur de type Python en tant que Iterator[pandas.Series] - >Iterator[pandas.Series].

Cette fonction définie par l'utilisateur pandas est utile lorsque son exécution requiert l’initialisation d’un certain état, par exemple, le chargement d’un fichier de modèle Machine Learning pour appliquer l’inférence à chaque lot d’entrée.

L’exemple suivant montre comment créer une fonction définie par l'utilisateur pandas avec prise en charge de l’itérateur.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

Fonction définie par l'utilisateur d’itérateur de plusieurs séries à itérateur de série

Une fonction définie par l'utilisateur d’itérateur de plusieurs séries à itérateur de série présente des caractéristiques et restrictions similaires à celles d’une fonction définie par l'utilisateur d’itérateur de série à itérateur de série. La fonction spécifiée prend un itérateur de lots et produit un itérateur de lots. Elle est également utile quand son exécution requiert l’initialisation d’un État.

Les différences sont les suivantes :

  • La fonction Python sous-jacente prend un itérateur d’un tuple de série pandas.
  • La fonction définie par l'utilisateur pandas enveloppée prend plusieurs colonnes Spark comme entrée.

Vous spécifiez les indicateurs de type comme Iterator[Tuple[pandas.Series, ...]] - >Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

Fonction définie par l'utilisateur de série à scalaire

Les fonctions définies par l'utilisateur pandas de série à scalaire sont similaires aux fonctions d’agrégation Spark. Une fonction définie par l'utilisateur pandas de série à scalaire définit une agrégation à partir d’une ou de plusieurs séries pandas à une valeur scalaire, où chaque série pandas représente une colonne Spark. Vous utilisez une fonction définie par l'utilisateur pandas de série à scalaire avec des API telles que select, withColumn, groupBy.agg et pyspark.sql.Window.

Vous exprimez l’indicateur de type en tant que pandas.Series, ... - >Any. Le type de retour doit être un type de données primitif, et la valeur scalaire retournée peut être un type primitif Python tel que int ou float, ou un type de données NumPy tel que numpy.int64 ou numpy.float64. Any devrait idéalement être un type scalaire spécifique.

Ce type de fonction définie par l'utilisateur ne prend pas en charge une agrégation partielle, et toutes les données pour chaque groupe sont chargées en mémoire.

L’exemple suivant montre comment utiliser ce type de fonction définie par l'utilisateur pour calculer une moyenne avec les opérations select, groupBy et window :

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

Pour plus d’informations sur l’utilisation, consultez pyspark.sql.functions.pandas_udf.

Usage

Définition de la taille de lot Arrow

Remarque

Cette configuration n’a aucun impact sur le calcul configuré avec le mode d’accès partagé et Databricks Runtime 13.3 LTS à 14.2.

Les partitions de données dans Spark étant converties en lots d’enregistrements Arrow, cela peut conduire temporairement à une utilisation élevée de la mémoire dans la machine virtuelle Java. Pour éviter d’éventuelles exceptions de mémoire insuffisante, vous pouvez ajuster la taille des lots d’enregistrements Arrow en définissant la configuration spark.sql.execution.arrow.maxRecordsPerBatch sur un nombre entier déterminant le nombre maximal de lignes pour chaque lot. La valeur par défaut est de 10 000 enregistrements par lot. Si le nombre de colonnes est élevé, la valeur devrait être ajustée en conséquence. Su la base de cette limite, chaque partition de données est divisée en 1 ou plusieurs lots d’enregistrements pour traitement.

Sémantique de TIMESTAMP WITH TIME ZONE

Spark stocke les timestamps en interne en tant que valeurs UTC, et les données de timestamp introduites sans fuseau horaire spécifié sont converties en heure locale au format UTC avec une résolution d’une microseconde.

Lors de l’exportation de données de timestamp ou de leur affichage dans Spark, le fuseau horaire de la session est utilisé pour localiser les valeurs de timestamp. Le fuseau horaire de la session est défini avec la configuration spark.sql.session.timeZone. Par défaut, ils s’agit du fuseau horaire local système de la machine virtuelle Java. pandas utilise un type datetime64 avec une résolution d’une nanoseconde, datetime64[ns], avec un fuseau horaire facultatif pour chaque colonne.

Lorsque des données de timestamp sont transférées de Spark vers pandas, elles sont converties en nanosecondes et chaque colonne est convertie dans le fuseau horaire de la session Spark, puis localisée dans ce fuseau, ce qui a pour effet de supprimer le fuseau horaire et d’afficher les valeurs en heure locale. Cela se produit lors de l’appel de toPandas() ou pandas_udf avec des colonnes de timestamp.

Lorsque des données de timestamp sont transférées de pandas vers Spark, elles sont converties en microsecondes UTC. Cela se produit lors de l’appel de createDataFrame avec une trame de données pandas ou lors du retour d’un timestamp à partir d’une fonction définie par l'utilisateur pandas. Ces conversions étant effectuées automatiquement pour s’assurer que Spark dispose de données au format attendu, il n’est pas nécessaire d’effectuer ces conversions manuellement. Toutes valeur en nanosecondes est tronquée.

Une fonction définie par l'utilisateur standard charge les données de timestamp en tant qu’objets dateheure Python, ce qui diffère d’un timestamp pandas. Pour obtenir des performances optimales, nous vous recommandons d’utiliser la fonctionnalité de série chronologique pandas lorsque vous utilisez des timestamps dans une fonction définie par l'utilisateur pandas. Pour plus d’informations, consultez Série chronologique / fonctionnalité Date.

Exemple de bloc-notes

Le notebook suivant illustre les améliorations de performances que vous pouvez obtenir avec des fonctions définies par l’utilisateur pandas :

notebook de banc d’essai de fonction définie par l'utilisateur pandas

Obtenir le notebook