Megosztás a következőn keresztül:


pandas függvény API-k

A pandas függvény API-k lehetővé teszik, hogy közvetlenül alkalmazzon egy Python natív függvényt, amely pandas-példányokat vesz fel és ad ki egy PySpark DataFrame-hez. Hasonlóan a pandas felhasználó által definiált függvényekhez, a függvény API-k is az Apache Arrow-t használják az adatok átvitelére, a pandas pedig az adatok kezelésére; a Python-típusjelzések azonban nem kötelezőek a pandas függvény API-jaiban.

A pandas függvény API-jainak három típusa létezik:

  • Csoportosított térkép
  • Térkép
  • Csoportosított térkép

A pandas függvény API-k ugyanazt a belső logikát használják, amelyet a pandas UDF-végrehajtás használ. Olyan jellemzőkkel rendelkeznek, mint a PyArrow, a támogatott SQL-típusok és a konfigurációk.

További információt az Apache Spark 3.0 közelgő kiadásában New Pandas UDFs és Python Type Hints című blogbejegyzésbentalál.

Csoportosított térkép

A csoportosított adatokat a "split-apply-combine" minta megvalósítása érdekében a groupBy().applyInPandas() segítségével alakítja át. A „split-apply-combine” három lépésből áll:

  • Ossza fel az adatokat csoportokra a DataFrame.groupByhasználatával.
  • Alkalmazz egy függvényt minden csoportra. A függvény bemenete és kimenete egyaránt pandas.DataFrame. A bemeneti adatok az egyes csoportok összes sorát és oszlopát tartalmazzák.
  • Egyesítse az eredményeket egy új DataFrame.

A groupBy().applyInPandas()használatához a következőket kell megadnia:

  • Egy Python-függvény, amely meghatározza az egyes csoportok számítását
  • A kimenet StructType sémáját meghatározó DataFrame objektum vagy karakterlánc

A visszaadott pandas.DataFrame oszlopcímkéinek meg kell egyezniük a megadott kimeneti sémában szereplő mezőnevekkel, ha sztringként vannak megadva, vagy a mezők adattípusaival pozíció szerint, ha nem sztringek, például egész számok. Lásd pandas. A DataFrame az oszlopok címkézésére pandas.DataFramelétrehozásakor.

Egy csoport összes adata betöltődik a memóriába a függvény alkalmazása előtt. Ez a memória kivételek kiváltásához vezethet, különösen akkor, ha a csoportméretek el vannak tolódva. A maxRecordsPerBatch konfigurációja nincs alkalmazva a csoportokra, és Önnek kell gondoskodnia arról, hogy a csoportosított adatok elférjenek a rendelkezésre álló memóriában.

Az alábbi példa bemutatja, hogyan vonhatja ki a középértéket a csoport minden egyes értékéből a groupby().apply() használatával.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

A részletes használatért lásd: pyspark.sql.GroupedData.applyInPandas.

Térkép

A pandas-példányokkal végzett térképműveleteket a DataFrame.mapInPandas() hajtja végre annak érdekében, hogy a pandas.DataFrame iterátorát a pandas.DataFrame egy másik iterátorává alakítsa, amely az aktuális PySpark DataFrame-et jelöli, és az eredményt PySpark DataFrame-ként adja vissza.

A mögöttes függvény a pandas.DataFrameiterátorát veszi fel és adja ki. Tetszőleges hosszúságú kimenetet adhat vissza, ellentétben bizonyos pandas UDF-ekkel, mint például a sorozatról sorozatra típusú.

Az alábbi példa a mapInPandas()használatát mutatja be:

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

A részletes használatért lásd: pyspark.sql.DataFrame.mapInPandas.

Csoportosított térkép

Pandas példányokkal végzett cogrouping térképműveletek esetén használja a DataFrame.groupby().cogroup().applyInPandas()-t két PySpark DataFramecsoportosítására közös kulcs alapján, majd alkalmazzon egy Python-függvényt az egyes csoportokra az alábbi módon.

  • Rendezd át az adatokat úgy, hogy azok az adatkeret csoportok, amelyek kulcsot osztanak meg, együtt legyenek társítva.
  • Függvény alkalmazása minden kogroupra. A függvény bemenete két pandas.DataFrame, és opcionálisan egy kulcsot ábrázoló tömb lehet. A függvény kimenete egy pandas.DataFrame.
  • Egyesítse az összes csoport pandas.DataFrame-ját egy új PySpark DataFrame-be.

A groupBy().cogroup().applyInPandas()használatához a következőket kell megadnia:

  • Egy Python-függvény, amely meghatározza az egyes csoportok számítását.
  • Egy StructType objektum vagy egy sztring, amely meghatározza a pySpark kimeneti DataFramesémáját.

A visszaadott pandas.DataFrame oszlopcímkéinek meg kell egyezniük a megadott kimeneti sémában szereplő mezőnevekkel, ha sztringként vannak megadva, vagy a mezők adattípusaival pozíció szerint, ha nem sztringek, például egész számok. Lásd pandas. A DataFrame az oszlopok címkézésére pandas.DataFramelétrehozásakor.

A csoport összes adata betöltődik a memóriába a függvény alkalmazása előtt. Ez a memória kivételek kiváltásához vezethet, különösen akkor, ha a csoportméretek el vannak tolódva. A maxRecordsPerBatch konfigurációja nincs alkalmazva, és Önnek kell gondoskodnia arról, hogy a csoportosított adatok elférjenek a rendelkezésre álló memóriában.

Az alábbi példa bemutatja, hogyan hajthat végre groupby().cogroup().applyInPandas() két adathalmaz között asof join.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

A részletes használatért lásd: pyspark.sql.PandasCogroupedOps.applyInPandas.