Compartir a través de


Funciones definidas por el usuario de Pandas

Una función definida por el usuario (UDF) de Pandas, también conocida como UDF vectorizada, es una función que usa Apache Arrow para transferir datos y Pandas para trabajar con ellos. Las UDF de Pandas permiten operaciones vectorizadas que pueden aumentar el rendimiento hasta 100 veces en comparación con las UDF de Python que van de una en una.

Si desea entrar en detalles, consulte la entrada de blog New Pandas UDF and Python Type Hints in the Upcoming Release of Apache Spark 3.0 (Nuevas UDF de Pandas y sugerencias de tipos de Python en la próxima versión de Apache Spark 3.0).

Defina una UDF de Pandas mediante la palabra clave pandas_udf como decorador y ajuste la función con una sugerencia de tipo de Python. En este artículo se describen los diferentes tipos de UDF de Pandas y se muestra cómo usarlas con sugerencias de tipo.

UDF de serie a serie

Puede usar una UDF de Pandas de serie a serie para vectorizar las operaciones escalares. Puede usarlas con API como select y withColumn.

La función de Python debe tomar una serie de Pandas como entrada y devolver otra de la misma longitud, y debe especificarlas en las sugerencias de tipo de Python. Spark ejecuta una UDF de Pandas dividiendo las columnas en lotes, llamando a la función de cada lote como un subconjunto de los datos y concatenando los resultados.

En el ejemplo siguiente se muestra cómo crear una UDF de Pandas que calcula el producto de 2 columnas.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

UDF de iterador de serie a iterador de serie

Una UDF de iterador es la misma que una UDF escalar de Pandas, excepto en que:

  • La función de Python
    • Toma como entrada un iterador de lotes en lugar de un único lote de entrada.
    • Devuelve un iterador de lotes de salida en lugar de un único lote de salida.
  • La longitud de toda la salida del iterador debe ser la misma que la longitud de toda la entrada.
  • La UDF encapsulada de Pandas toma una sola columna de Spark como entrada.

Debe especificar la sugerencia de tipo de Python como Iterator[pandas.Series] ->Iterator[pandas.Series].

Esta UDF de Pandas es útil cuando la ejecución de UDF requiere inicializar algún estado, por ejemplo, cargar un archivo de modelo de Machine Learning para aplicar la inferencia a cada lote de entrada.

En el ejemplo siguiente se muestra cómo crear una UDF de Pandas con compatibilidad con iteradores.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

UDF de iterador de varias series a iterador de serie

Una UDF de iterador de varias series a iterador de serie tiene características y restricciones similares a la UDF de iterador de serie a iterador de serie. La función especificada toma un iterador de lotes y genera otro. También es útil cuando la ejecución de UDF requiere inicializar algún estado.

Las diferencias son:

  • La función subyacente de Python toma un iterador de una tupla de la serie de Pandas.
  • La UDF encapsulada de Pandas toma varias columnas de Spark como entrada.

Puede especificar las sugerencias de tipo como Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

UDF de serie a escalar

Las UDF de Pandas de serie a escalar son similares a las funciones de agregado de Spark. Una UDF de Pandas de serie a escalar define una agregación de una o varias series de Pandas a un valor escalar, en la que cada serie de Pandas representa una columna de Spark. Puede usar una UDF de Pandas de serie a escalar con API como select, withColumn, groupBy.agg y pyspark.sql.Window.

Puede expresar la sugerencia de tipo como pandas.Series, ... ->Any. El tipo de valor devuelto debe ser un tipo de datos primitivo y el escalar devuelto puede ser un tipo primitivo de Python, por ejemplo, int o float, o un tipo de datos NumPy como numpy.int64 o numpy.float64. Any debería ser idealmente un tipo escalar específico.

Este tipo de UDF no admite la agregación parcial y todos los datos de cada grupo se cargan en la memoria.

En el ejemplo siguiente se muestra cómo usar este tipo de UDF para calcular la media con las operaciones select, groupBy y window:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

Para obtener un uso detallado, consulte pyspark.sql.functions.pandas_udf.

Uso

Establecimiento del tamaño del lote de Arrow

Nota:

Esta configuración no afecta al proceso configurado con el modo de acceso compartido y Databricks Runtime 13.3 LTS a 14.2.

Las particiones de datos de Spark se convierten en lotes de registros de Arrow, lo que puede provocar temporalmente un uso elevado de memoria en la máquina virtual Java. Para evitar posibles excepciones de memoria insuficiente, puede ajustar el tamaño de los lotes de registros de flecha estableciendo la configuración spark.sql.execution.arrow.maxRecordsPerBatch en un entero que determine el número máximo de filas para cada lote. El valor predeterminado es 10 000 registros por lote. Si el número de columnas es grande, el valor debe ajustarse en consecuencia. Con este límite, cada partición de datos se divide en 1 o más lotes de registros para su procesamiento.

Marca de tiempo con semántica de zona horaria

Spark almacena internamente las marcas de tiempo como valores UTC y los datos de marca de tiempo que entran sin una zona horaria especificada se convierten como hora local a UTC con una resolución de microsegundos.

Cuando los datos de marca de tiempo se exportan o se muestran en Spark, la zona horaria de la sesión se usa para localizar los valores de marca de tiempo. La zona horaria de la sesión se establece con la configuración spark.sql.session.timeZone y el valor predeterminado es la zona horaria local del sistema de la máquina virtual Java. Pandas usa un tipo datetime64 con resolución de nanosegundos, datetime64[ns], con zona horaria opcional por columna.

Cuando los datos de marca de tiempo se transfieren de Spark a Pandas, se convierten en nanosegundos y cada columna se convierte a la zona horaria de la sesión de Spark y luego se traduce a esa zona horaria, lo que elimina la zona horaria y muestra los valores como hora local. Esto sucede cuando se llama a toPandas() o pandas_udf con columnas de marca de tiempo.

Cuando los datos de marca de tiempo se transfieren de Pandas a Spark, se convierten a microsegundos UTC. Esto ocurre al llamar a createDataFrame con un dataframe de Pandas o al devolver una marca de tiempo de una UDF de Pandas. Estas conversiones se realizan automáticamente para asegurarse de que Spark tenga datos en el formato esperado, por lo que no es necesario que las realice el usuario. Los valores en nanosegundos se truncan.

Una UDF estándar carga los datos de marca de tiempo como objetos datetime de Python, que es diferente de una marca de tiempo de Pandas. Para conseguir el mejor rendimiento, se recomienda usar la funcionalidad de serie temporal de Pandas al trabajar con marcas de tiempo en una UDF de Pandas. Para más información, consulte Funcionalidad de serie temporal y fecha.

Cuaderno de ejemplo

En el cuaderno siguiente se muestran las mejoras de rendimiento que puede lograr con las UDF de Pandas:

Cuaderno de pruebas comparativas de UDF de Pandas

Obtener el cuaderno