Delen via


Pandas-functie-API's

Met pandas-functie-API's kunt u rechtstreeks een systeemeigen Python-functie toepassen die pandas-exemplaren gebruikt en uitvoert naar een PySpark DataFrame. Net als bij door de gebruiker gedefinieerde pandas-functies gebruiken functie-API's ook Apache Arrow om gegevens en pandas over te dragen om met de gegevens te werken; Python-typehints zijn echter optioneel in pandas-functie-API's.

Er zijn drie typen pandas-functie-API's:

  • Gegroepeerde kaart
  • Kaart
  • Gegroepeerde kaart

Pandas-functie-API's maken gebruik van dezelfde interne logica die pandas UDF-uitvoering gebruikt. Ze delen kenmerken zoals PyArrow, ondersteunde SQL-typen en de configuraties.

Zie het blogbericht New Pandas UDF's and Python Type Hints in the Upcoming Release of Apache Spark 3.0 (Engelstalig) voor meer informatie.

Gegroepeerde kaart

U transformeert uw gegroepeerde gegevens met behulp van groupBy().applyInPandas() om het patroon 'splitsen-toepassen-combineren' te implementeren. Split-apply-combine bestaat uit drie stappen:

  • Splits de gegevens in groepen met behulp van DataFrame.groupBy.
  • Een functie toepassen op elke groep. De invoer en uitvoer van de functie zijn beide pandas.DataFrame. De invoergegevens bevatten alle rijen en kolommen voor elke groep.
  • Combineer de resultaten in een nieuwe DataFrame.

Als u wilt gebruiken groupBy().applyInPandas(), moet u het volgende definiƫren:

  • Een Python-functie die de berekening voor elke groep definieert
  • Een StructType object of tekenreeks die het schema van de uitvoer definieert DataFrame

De kolomlabels van de geretourneerde pandas.DataFrame moeten overeenkomen met de veldnamen in het gedefinieerde uitvoerschema als deze zijn opgegeven als tekenreeksen, of overeenkomen met de veldgegevenstypen op positie, indien niet tekenreeksen, bijvoorbeeld indexen met gehele getallen. Zie pandas. DataFrame voor het labelen van kolommen bij het maken van een pandas.DataFrame.

Alle gegevens voor een groep worden in het geheugen geladen voordat de functie wordt toegepast. Dit kan leiden tot uitzonderingen voor onvoldoende geheugen, met name als de groepsgrootten scheef zijn. De configuratie voor maxRecordsPerBatch wordt niet toegepast op groepen en het is aan u om ervoor te zorgen dat de gegroepeerde gegevens in het beschikbare geheugen passen.

In het volgende voorbeeld ziet u hoe u groupby().apply() gebruikt om het gemiddelde af te trekken van elke waarde in de groep.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Zie pyspark.sql.GroupedData.applyInPandas voor gedetailleerd gebruik.

Kaart

U voert toewijzingsbewerkingen uit met pandas-exemplaren door DataFrame.mapInPandas() om een iterator van pandas.DataFrame te transformeren naar een andere iterator van pandas.DataFrame die het huidige PySpark DataFrame vertegenwoordigt en het resultaat retourneert als een PySpark DataFrame.

De onderliggende functie gebruikt en voert een iterator van pandas.DataFrameuit. Het kan uitvoer van willekeurige lengte retourneren in tegenstelling tot sommige pandas-UDF's, zoals reeks naar reeks.

In het volgende voorbeeld ziet u hoe u mapInPandas()gebruikt:

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Zie pyspark.sql.DataFrame.mapInPandas voor gedetailleerd gebruik.

Gegroepeerde kaart

Gebruik voor gegroepeerde toewijzingsbewerkingen met pandas-exemplaren DataFrame.groupby().cogroup().applyInPandas() om twee PySpark DataFrames te groeperen met een gemeenschappelijke sleutel en vervolgens een Python-functie toe te passen op elke cogroep, zoals wordt weergegeven:

  • De gegevens zodanig verdelen dat de groepen van elk DataFrame die een sleutel delen, samen worden gegroepeerd.
  • Een functie toepassen op elke cogroep. De invoer van de functie is twee pandas.DataFrame (met een optionele tuple die de sleutel vertegenwoordigt). De uitvoer van de functie is een pandas.DataFrame.
  • Combineer de pandas.DataFrames van alle groepen in een nieuwe PySpark DataFrame.

Als u wilt gebruiken groupBy().cogroup().applyInPandas(), moet u het volgende definiƫren:

  • Een Python-functie waarmee de berekening voor elke cogroep wordt gedefinieerd.
  • Een StructType object of een tekenreeks die het schema van de pySpark-uitvoer DataFramedefinieert.

De kolomlabels van de geretourneerde pandas.DataFrame moeten overeenkomen met de veldnamen in het gedefinieerde uitvoerschema als deze zijn opgegeven als tekenreeksen, of overeenkomen met de veldgegevenstypen op positie, indien niet tekenreeksen, bijvoorbeeld indexen met gehele getallen. Zie pandas. DataFrame voor het labelen van kolommen bij het maken van een pandas.DataFrame.

Alle gegevens voor een cogroep worden in het geheugen geladen voordat de functie wordt toegepast. Dit kan leiden tot uitzonderingen voor onvoldoende geheugen, met name als de groepsgrootten scheef zijn. De configuratie voor maxRecordsPerBatch wordt niet toegepast en het is aan u om ervoor te zorgen dat de gegroepeerde gegevens in het beschikbare geheugen passen.

In het volgende voorbeeld ziet u hoe u groupby().cogroup().applyInPandas() een tussen twee gegevenssets kunt uitvoeren asof join .

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Zie pyspark.sql.PandasCogroupedOps.applyInPandas voor gedetailleerd gebruik.