API:er för pandas-funktioner
Med pandas-funktions-API:er kan du direkt tillämpa en intern Python-funktion som tar och matar ut Pandas-instanser till en PySpark DataFrame. Precis som användardefinierade funktioner i Pandas använder funktions-API:er även Apache Arrow för att överföra data och Pandas för att arbeta med data. Python-typtips är dock valfria i PANDAS-funktions-API:er.
Det finns tre typer av API:er för Pandas-funktioner:
- Grupperad karta
- Karta
- Samordnad karta
PANDAS-funktions-API:er använder samma interna logik som pandas UDF-körning använder. De delar egenskaper som PyArrow, SQL-typer som stöds och konfigurationer.
Mer information finns i blogginlägget Nya Pandas UDF:er och Python-typtips i den kommande versionen av Apache Spark 3.0.
Grupperad karta
Du transformerar dina grupperade data med för groupBy().applyInPandas()
att implementera mönstret "split-apply-combine". Split-apply-combine består av tre steg:
- Dela upp data i grupper med hjälp
DataFrame.groupBy
av . - Tillämpa en funktion på varje grupp. Indata och utdata för funktionen är båda
pandas.DataFrame
. Indata innehåller alla rader och kolumner för varje grupp. - Kombinera resultaten till en ny
DataFrame
.
Om du vill använda groupBy().applyInPandas()
måste du definiera följande:
- En Python-funktion som definierar beräkningen för varje grupp
- Ett
StructType
objekt eller en sträng som definierar schemat för utdataDataFrame
Kolumnetiketterna för de returnerade pandas.DataFrame
måste antingen matcha fältnamnen i det definierade utdataschemat om de anges som strängar, eller matcha fältdatatyperna efter position om inte strängar, till exempel heltalsindex. Se Pandas. DataFrame för hur du etiketterar kolumner när du skapar en pandas.DataFrame
.
Alla data för en grupp läses in i minnet innan funktionen används. Detta kan leda till minnesfel, särskilt om gruppstorlekarna är skeva. Konfigurationen för maxRecordsPerBatch tillämpas inte på grupper och det är upp till dig att se till att grupperade data passar in i det tillgängliga minnet.
I följande exempel visas hur du använder groupby().apply()
för att subtrahera medelvärdet från varje värde i gruppen.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Detaljerad användning finns i pyspark.sql.GroupedData.applyInPandas.
Karta
Du utför kartåtgärder med Pandas-instanser genom DataFrame.mapInPandas()
att transformera en iterator pandas.DataFrame
av till en annan iterator pandas.DataFrame
som representerar den aktuella PySpark DataFrame och returnerar resultatet som en PySpark DataFrame.
Den underliggande funktionen tar och matar ut en iterator av pandas.DataFrame
. Det kan returnera utdata av godtycklig längd i motsats till vissa Pandas UDF:er, till exempel Serie till serie.
I följande exempel visas hur du använder mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Detaljerad användning finns i pyspark.sql.DataFrame.mapInPandas.
Samordnad karta
För grupperade kartåtgärder med Pandas-instanser använder DataFrame.groupby().cogroup().applyInPandas()
du för att gruppera två PySpark DataFrame
s med en gemensam nyckel och sedan tillämpa en Python-funktion på varje grupp enligt följande:
- Blanda data så att grupperna för varje DataFrame som delar en nyckel grupperas tillsammans.
- Tillämpa en funktion på varje grupp. Indata för funktionen är två
pandas.DataFrame
(med en valfri tuppeln som representerar nyckeln). Funktionens utdata är enpandas.DataFrame
. -
pandas.DataFrame
Kombinera s från alla grupper till en ny PySparkDataFrame
.
Om du vill använda groupBy().cogroup().applyInPandas()
måste du definiera följande:
- En Python-funktion som definierar beräkningen för varje grupp.
- Ett
StructType
objekt eller en sträng som definierar schemat för PySpark-utdataDataFrame
.
Kolumnetiketterna för de returnerade pandas.DataFrame
måste antingen matcha fältnamnen i det definierade utdataschemat om de anges som strängar, eller matcha fältdatatyperna efter position om inte strängar, till exempel heltalsindex. Se Pandas. DataFrame för hur du etiketterar kolumner när du skapar en pandas.DataFrame
.
Alla data för en grupp läses in i minnet innan funktionen används. Detta kan leda till minnesfel, särskilt om gruppstorlekarna är skeva. Konfigurationen för maxRecordsPerBatch tillämpas inte och det är upp till dig att se till att de grupperade data passar in i det tillgängliga minnet.
I följande exempel visas hur du använder groupby().cogroup().applyInPandas()
för att utföra en asof join
mellan två datauppsättningar.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Detaljerad användning finns i pyspark.sql.PandasCogroupedOps.applyInPandas.