Dela via


API:er för pandas-funktioner

Med pandas-funktions-API:er kan du direkt tillämpa en intern Python-funktion som tar och matar ut Pandas-instanser till en PySpark DataFrame. Precis som användardefinierade funktioner i Pandas använder funktions-API:er även Apache Arrow för att överföra data och Pandas för att arbeta med data. Python-typtips är dock valfria i PANDAS-funktions-API:er.

Det finns tre typer av API:er för Pandas-funktioner:

  • Grupperad karta
  • Karta
  • Samordnad karta

PANDAS-funktions-API:er använder samma interna logik som pandas UDF-körning använder. De delar egenskaper som PyArrow, SQL-typer som stöds och konfigurationer.

Mer information finns i blogginlägget Nya Pandas UDF:er och Python-typtips i den kommande versionen av Apache Spark 3.0.

Grupperad karta

Du transformerar dina grupperade data med för groupBy().applyInPandas() att implementera mönstret "split-apply-combine". Split-apply-combine består av tre steg:

  • Dela upp data i grupper med hjälp DataFrame.groupByav .
  • Tillämpa en funktion på varje grupp. Indata och utdata för funktionen är båda pandas.DataFrame. Indata innehåller alla rader och kolumner för varje grupp.
  • Kombinera resultaten till en ny DataFrame.

Om du vill använda groupBy().applyInPandas()måste du definiera följande:

  • En Python-funktion som definierar beräkningen för varje grupp
  • Ett StructType objekt eller en sträng som definierar schemat för utdata DataFrame

Kolumnetiketterna för de returnerade pandas.DataFrame måste antingen matcha fältnamnen i det definierade utdataschemat om de anges som strängar, eller matcha fältdatatyperna efter position om inte strängar, till exempel heltalsindex. Se Pandas. DataFrame för hur du etiketterar kolumner när du skapar en pandas.DataFrame.

Alla data för en grupp läses in i minnet innan funktionen används. Detta kan leda till minnesfel, särskilt om gruppstorlekarna är skeva. Konfigurationen för maxRecordsPerBatch tillämpas inte på grupper och det är upp till dig att se till att grupperade data passar in i det tillgängliga minnet.

I följande exempel visas hur du använder groupby().apply() för att subtrahera medelvärdet från varje värde i gruppen.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Detaljerad användning finns i pyspark.sql.GroupedData.applyInPandas.

Karta

Du utför kartåtgärder med Pandas-instanser genom DataFrame.mapInPandas() att transformera en iterator pandas.DataFrame av till en annan iterator pandas.DataFrame som representerar den aktuella PySpark DataFrame och returnerar resultatet som en PySpark DataFrame.

Den underliggande funktionen tar och matar ut en iterator av pandas.DataFrame. Det kan returnera utdata av godtycklig längd i motsats till vissa Pandas UDF:er, till exempel Serie till serie.

I följande exempel visas hur du använder mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Detaljerad användning finns i pyspark.sql.DataFrame.mapInPandas.

Samordnad karta

För grupperade kartåtgärder med Pandas-instanser använder DataFrame.groupby().cogroup().applyInPandas() du för att gruppera två PySpark DataFrames med en gemensam nyckel och sedan tillämpa en Python-funktion på varje grupp enligt följande:

  • Blanda data så att grupperna för varje DataFrame som delar en nyckel grupperas tillsammans.
  • Tillämpa en funktion på varje grupp. Indata för funktionen är två pandas.DataFrame (med en valfri tuppeln som representerar nyckeln). Funktionens utdata är en pandas.DataFrame.
  • pandas.DataFrameKombinera s från alla grupper till en ny PySpark DataFrame.

Om du vill använda groupBy().cogroup().applyInPandas()måste du definiera följande:

  • En Python-funktion som definierar beräkningen för varje grupp.
  • Ett StructType objekt eller en sträng som definierar schemat för PySpark-utdata DataFrame.

Kolumnetiketterna för de returnerade pandas.DataFrame måste antingen matcha fältnamnen i det definierade utdataschemat om de anges som strängar, eller matcha fältdatatyperna efter position om inte strängar, till exempel heltalsindex. Se Pandas. DataFrame för hur du etiketterar kolumner när du skapar en pandas.DataFrame.

Alla data för en grupp läses in i minnet innan funktionen används. Detta kan leda till minnesfel, särskilt om gruppstorlekarna är skeva. Konfigurationen för maxRecordsPerBatch tillämpas inte och det är upp till dig att se till att de grupperade data passar in i det tillgängliga minnet.

I följande exempel visas hur du använder groupby().cogroup().applyInPandas() för att utföra en asof join mellan två datauppsättningar.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Detaljerad användning finns i pyspark.sql.PandasCogroupedOps.applyInPandas.