Partekatu bidez


API de función de Pandas

Las funciones API de pandas permiten aplicar directamente una función nativa de Python que toma y genera instancias de pandas en un DataFrame de PySpark. De forma similar a las funciones definidas por el usuario de Pandas, las API de función también usan Apache Arrow para transferir datos y pandas para trabajar con los datos; sin embargo, las sugerencias de tipo de Python son opcionales en las API de función pandas.

Hay tres tipos de funciones de API de pandas:

  • Mapa agrupado
  • Mapa
  • Mapa coagrupado

Las API de función pandas aprovechan la misma lógica interna que usa la ejecución de UDF de Pandas. Comparten características como PyArrow, tipos de SQL admitidos y las configuraciones.

Para obtener más información, consulte la publicación en el blog Nuevos UDFs de Pandas y sugerencias de tipo en Python en la próxima versión de Apache Spark 3.0.

Mapa agrupado

Transforma tus datos agrupados mediante groupBy().applyInPandas() para implementar el patrón "dividir-aplicar-combinar". Split-apply-combine consta de tres pasos:

  • Divida los datos en grupos mediante DataFrame.groupBy.
  • Aplique una función en cada grupo. La entrada y salida de la función son pandas.DataFrame. Los datos de entrada contienen todas las filas y columnas de cada grupo.
  • Combine los resultados en un nuevo DataFrame.

Para usar groupBy().applyInPandas(), debe definir lo siguiente:

  • Función de Python que define el cálculo de cada grupo
  • Objeto StructType o una cadena que define el esquema de la salida DataFrame

Las etiquetas de columna del devuelto pandas.DataFrame deben coincidir con los nombres de campo del esquema de salida definido si se especifican como cadenas, o coincidir con los tipos de datos de campo por posición si no son cadenas, por ejemplo, índices enteros. Vea Pandas. DataFrame para etiquetar columnas al construir un pandas.DataFrame.

Todos los datos de un grupo se cargan en la memoria antes de aplicar la función. Esto puede provocar excepciones de falta de memoria, especialmente si los tamaños de grupos están sesgados. La configuración de maxRecordsPerBatch no se aplica en grupos y depende de usted asegurarse de que los datos agrupados se ajusten a la memoria disponible.

En el ejemplo siguiente se muestra cómo usar groupby().apply() para restar la media de cada valor del grupo.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Para obtener un uso detallado, consulte pyspark.sql.GroupedData.applyInPandas.

Mapa

Realice operaciones de mapeo con instancias de Pandas DataFrame.mapInPandas() para transformar un iterador de pandas.DataFrame a otro iterador de pandas.DataFrame que represente el DataFrame de PySpark actual y devuelva el resultado como un DataFrame de PySpark.

La función subyacente toma y genera un iterador de pandas.DataFrame. Puede devolver la salida de longitud arbitraria en contraste con algunas UDF de Pandas, como Series a Series.

En el siguiente ejemplo se muestra cómo usar mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Para obtener un uso detallado, consulte pyspark.sql.DataFrame.mapInPandas.

Mapa cogrupado

En el caso de las operaciones de mapa cogrupeadas con instancias de Pandas, use DataFrame.groupby().cogroup().applyInPandas() para agrupar dos PySpark DataFrame mediante una clave común y aplique una función de Python a cada cogrupo como se muestra:

  • Ordene los datos de forma que los grupos de cada DataFrame que comparten una clave se agrupan conjuntamente.
  • Aplique una función a cada cogrupo. La entrada de la función es dos pandas.DataFrame (con una tupla opcional que representa la clave). La salida de la función es un pandas.DataFrame.
  • Combina los pandas.DataFrames de todos los grupos y colócalos en un nuevo PySpark DataFrame.

Para usar groupBy().cogroup().applyInPandas(), debe definir lo siguiente:

  • Función de Python que define el cálculo de cada cogrupo.
  • Objeto StructType o una cadena que define el esquema de la salida PySpark DataFrame.

Las etiquetas de columna del pandas.DataFrame devuelto deben coincidir con los nombres de los campos en el esquema de salida definido si están especificadas como cadenas, o coincidir con los tipos de datos de los campos por posición si no lo son, por ejemplo, índices enteros. Vea Pandas. DataFrame para etiquetar columnas al construir un pandas.DataFrame.

Todos los datos de un cogrupo se cargan en memoria antes de aplicar la función. Esto puede provocar excepciones por falta de memoria, especialmente si los tamaños de grupo presentan sesgo. No se aplica la configuración de maxRecordsPerBatch y es su función asegurarse de que los datos agrupados se ajusten a la memoria disponible.

En el ejemplo siguiente se muestra cómo usar groupby().cogroup().applyInPandas() para realizar un asof join entre dos conjuntos de datos.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Para obtener un uso detallado, consulte pyspark.sql.PandasCogroupedOps.applyInPandas.