Condividi tramite


API della funzione pandas

le API della funzione pandas consentono di applicare direttamente una funzione nativa Python che accetta e restituisce istanze pandas a un dataframe PySpark. Analogamente alle funzioni definite dall'utente pandas, le API di funzione usano anche Apache Arrow per trasferire i dati e pandas per lavorare con i dati; Tuttavia, gli hint di tipo Python sono facoltativi nelle API della funzione pandas.

Esistono tre tipi di API per le funzioni pandas:

  • Mappa raggruppata
  • Mappa
  • Mappa raggruppata

le API della funzione pandas sfruttano la stessa logica interna usata dall'esecuzione di pandas UDF. Condividono caratteristiche come PyArrow, tipi SQL supportati e configurazioni.

Per altre informazioni, vedere il post di blog New Pandas UDFs e Python Type Hints nella prossima versione di Apache Spark 3.0.

Mappa raggruppata

Si trasformano i dati raggruppati usando groupBy().applyInPandas() per implementare il modello "split-apply-combine". La combinazione split-apply è costituita da tre passaggi:

  • Suddividere i dati in gruppi usando DataFrame.groupBy.
  • Applicare una funzione in ogni gruppo. L'input e l'output della funzione sono entrambi pandas.DataFrame. I dati di input contengono tutte le righe e le colonne per ogni gruppo.
  • Combinare i risultati in un nuovo DataFrameoggetto .

Per usare groupBy().applyInPandas(), è necessario definire quanto segue:

  • Funzione Python che definisce il calcolo per ogni gruppo
  • Oggetto StructType o stringa che definisce lo schema dell'output DataFrame

Le etichette di colonna del restituito pandas.DataFrame devono corrispondere ai nomi dei campi nello schema di output definito se specificati come stringhe o corrispondono ai tipi di dati del campo in base alla posizione se non alle stringhe, ad esempio gli indici integer. Vedere pandas. DataFrame per come etichettare le colonne quando si costruisce un pandas.DataFrameoggetto .

Tutti i dati per un gruppo vengono caricati in memoria prima dell'applicazione della funzione. Ciò può causare eccezioni di memoria insufficiente, soprattutto se le dimensioni del gruppo sono asimmetriche. La configurazione per maxRecordsPerBatch non viene applicata ai gruppi ed è necessario assicurarsi che i dati raggruppati si adattino alla memoria disponibile.

Nell'esempio seguente viene illustrato come usare groupby().apply() per sottrarre la media da ogni valore del gruppo.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.GroupedData.applyInPandas.

Mappa

Eseguire operazioni di mapping con istanze DataFrame.mapInPandas() pandas per trasformare un iteratore di pandas.DataFrame in un altro iteratore di pandas.DataFrame che rappresenta il dataframe PySpark corrente e restituisce il risultato come dataframe PySpark.

La funzione sottostante accetta e restituisce un iteratore di pandas.DataFrame. Può restituire l'output di lunghezza arbitraria invece di alcuni pandas UDFs, ad esempio Series to Series.

Nell'esempio seguente viene illustrato come usare mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.DataFrame.mapInPandas.

Mappa raggruppata

Per le operazioni di mapping raggruppate con istanze pandas, usare DataFrame.groupby().cogroup().applyInPandas() per raggruppare due PySpark DataFrames in base a una chiave comune e quindi applicare una funzione Python a ogni gruppo, come illustrato di seguito:

  • Rilegare i dati in modo che i gruppi di ogni dataframe che condividono una chiave siano raggruppati insieme.
  • Applicare una funzione a ogni cogroup. L'input della funzione è due pandas.DataFrame (con una tupla facoltativa che rappresenta la chiave). L'output della funzione è un pandas.DataFrameoggetto .
  • Combinare l's pandas.DataFrameda tutti i gruppi in un nuovo PySpark DataFrame.

Per usare groupBy().cogroup().applyInPandas(), è necessario definire quanto segue:

  • Funzione Python che definisce il calcolo per ogni cogroup.
  • Oggetto StructType o stringa che definisce lo schema dell'output PySpark DataFrame.

Le etichette di colonna del restituito pandas.DataFrame devono corrispondere ai nomi dei campi nello schema di output definito se specificati come stringhe o corrispondono ai tipi di dati del campo in base alla posizione se non alle stringhe, ad esempio gli indici integer. Vedere pandas. DataFrame per come etichettare le colonne quando si costruisce un pandas.DataFrameoggetto .

Tutti i dati per un cogroup vengono caricati in memoria prima dell'applicazione della funzione. Ciò può causare eccezioni di memoria insufficiente, soprattutto se le dimensioni del gruppo sono asimmetriche. La configurazione per maxRecordsPerBatch non viene applicata ed è necessario assicurarsi che i dati raggruppati si adattino alla memoria disponibile.

Nell'esempio seguente viene illustrato come usare groupby().cogroup().applyInPandas() per eseguire un asof join set di dati tra due set di dati.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.PandasCogroupedOps.applyInPandas.