API della funzione pandas
le API della funzione pandas consentono di applicare direttamente una funzione nativa Python che accetta e restituisce istanze pandas a un dataframe PySpark. Analogamente alle funzioni definite dall'utente pandas, le API di funzione usano anche Apache Arrow per trasferire i dati e pandas per lavorare con i dati; Tuttavia, gli hint di tipo Python sono facoltativi nelle API della funzione pandas.
Esistono tre tipi di API per le funzioni pandas:
- Mappa raggruppata
- Mappa
- Mappa raggruppata
le API della funzione pandas sfruttano la stessa logica interna usata dall'esecuzione di pandas UDF. Condividono caratteristiche come PyArrow, tipi SQL supportati e configurazioni.
Per altre informazioni, vedere il post di blog New Pandas UDFs e Python Type Hints nella prossima versione di Apache Spark 3.0.
Mappa raggruppata
Si trasformano i dati raggruppati usando groupBy().applyInPandas()
per implementare il modello "split-apply-combine". La combinazione split-apply è costituita da tre passaggi:
- Suddividere i dati in gruppi usando
DataFrame.groupBy
. - Applicare una funzione in ogni gruppo. L'input e l'output della funzione sono entrambi
pandas.DataFrame
. I dati di input contengono tutte le righe e le colonne per ogni gruppo. - Combinare i risultati in un nuovo
DataFrame
oggetto .
Per usare groupBy().applyInPandas()
, è necessario definire quanto segue:
- Funzione Python che definisce il calcolo per ogni gruppo
- Oggetto
StructType
o stringa che definisce lo schema dell'outputDataFrame
Le etichette di colonna del restituito pandas.DataFrame
devono corrispondere ai nomi dei campi nello schema di output definito se specificati come stringhe o corrispondono ai tipi di dati del campo in base alla posizione se non alle stringhe, ad esempio gli indici integer. Vedere pandas. DataFrame per come etichettare le colonne quando si costruisce un pandas.DataFrame
oggetto .
Tutti i dati per un gruppo vengono caricati in memoria prima dell'applicazione della funzione. Ciò può causare eccezioni di memoria insufficiente, soprattutto se le dimensioni del gruppo sono asimmetriche. La configurazione per maxRecordsPerBatch non viene applicata ai gruppi ed è necessario assicurarsi che i dati raggruppati si adattino alla memoria disponibile.
Nell'esempio seguente viene illustrato come usare groupby().apply()
per sottrarre la media da ogni valore del gruppo.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.GroupedData.applyInPandas.
Mappa
Eseguire operazioni di mapping con istanze DataFrame.mapInPandas()
pandas per trasformare un iteratore di pandas.DataFrame
in un altro iteratore di pandas.DataFrame
che rappresenta il dataframe PySpark corrente e restituisce il risultato come dataframe PySpark.
La funzione sottostante accetta e restituisce un iteratore di pandas.DataFrame
. Può restituire l'output di lunghezza arbitraria invece di alcuni pandas UDFs, ad esempio Series to Series.
Nell'esempio seguente viene illustrato come usare mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.DataFrame.mapInPandas.
Mappa raggruppata
Per le operazioni di mapping raggruppate con istanze pandas, usare DataFrame.groupby().cogroup().applyInPandas()
per raggruppare due PySpark DataFrame
s in base a una chiave comune e quindi applicare una funzione Python a ogni gruppo, come illustrato di seguito:
- Rilegare i dati in modo che i gruppi di ogni dataframe che condividono una chiave siano raggruppati insieme.
- Applicare una funzione a ogni cogroup. L'input della funzione è due
pandas.DataFrame
(con una tupla facoltativa che rappresenta la chiave). L'output della funzione è unpandas.DataFrame
oggetto . - Combinare l's
pandas.DataFrame
da tutti i gruppi in un nuovo PySparkDataFrame
.
Per usare groupBy().cogroup().applyInPandas()
, è necessario definire quanto segue:
- Funzione Python che definisce il calcolo per ogni cogroup.
- Oggetto
StructType
o stringa che definisce lo schema dell'output PySparkDataFrame
.
Le etichette di colonna del restituito pandas.DataFrame
devono corrispondere ai nomi dei campi nello schema di output definito se specificati come stringhe o corrispondono ai tipi di dati del campo in base alla posizione se non alle stringhe, ad esempio gli indici integer. Vedere pandas. DataFrame per come etichettare le colonne quando si costruisce un pandas.DataFrame
oggetto .
Tutti i dati per un cogroup vengono caricati in memoria prima dell'applicazione della funzione. Ciò può causare eccezioni di memoria insufficiente, soprattutto se le dimensioni del gruppo sono asimmetriche. La configurazione per maxRecordsPerBatch non viene applicata ed è necessario assicurarsi che i dati raggruppati si adattino alla memoria disponibile.
Nell'esempio seguente viene illustrato come usare groupby().cogroup().applyInPandas()
per eseguire un asof join
set di dati tra due set di dati.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.PandasCogroupedOps.applyInPandas.