Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Rozhraní API funkcí pandas umožňují přímo použít nativní funkci Pythonu, která přebírá a odesílá instance pandas do datového rámce PySpark. Podobně jako uživatelem definované funkce pandas používají rozhraní API funkcí pandas také Apache Arrow k přenosu dat a pandas ke zpracování těchto dat; v API funkcí pandas jsou však volitelné typové náznaky Pythonu.
Existují tři typy rozhraní API funkcí pandas:
- Seskupené mapy
- Mapa
- Kogroupovaná mapa
Rozhraní API funkcí pandas využívají stejnou interní logiku, kterou používá vykonávání pandas UDF. Sdílejí charakteristiky, jako jsou PyArrow, podporované typy SQL a konfigurace.
Další informace najdete v blogovém příspěvku Nové uživatelské definované funkce Pandas a typové pokyny Pythonu v nadcházející verzi Apache Spark 3.0.
Seskupené mapy
Seskupená data přetvoříte pomocí groupBy().applyInPandas() a implementujete vzorec „split-apply-combine“. Rozdělení, aplikace a kombinace se skládá ze tří kroků:
- Rozdělte data do skupin pomocí
DataFrame.groupBy. - Použijte funkci na každou skupinu. Vstup i výstup funkce jsou
pandas.DataFrame. Vstupní data obsahují všechny řádky a sloupce pro každou skupinu. - Zkombinujte výsledky do nového
DataFrame.
Pokud chcete použít groupBy().applyInPandas(), musíte definovat následující:
- Funkce Pythonu, která definuje výpočet pro každou skupinu
- Objekt
StructTypenebo řetězec, který definuje schéma výstupníhoDataFrame
Popisky sloupců vrácených pandas.DataFrame musí buď odpovídat názvům polí v definovaném výstupním schématu, pokud jsou zadány jako řetězce, nebo musí odpovídat datovým typům polí podle pozice, například celočíselné indexy. Viz pandas.DataFrame pro to, jak označit sloupce při vytváření pandas.DataFrame.
Všechna data pro skupinu se načtou do paměti před tím, než se funkce použije. To může vést k výjimkám typu nedostatek paměti, zejména pokud jsou velikosti skupin zkreslené. Konfigurace pro maxRecordsPerBatch není použita pro skupiny a je na vás, aby se seskupovaná data vešla do dostupné paměti.
Následující příklad ukazuje, jak pomocí groupby().apply() odečíst průměr od každé hodnoty ve skupině.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Pro podrobnosti o použití viz pyspark.sql.GroupedData.applyInPandas.
Mapa
Operace mapování s instancemi pandas provádíte DataFrame.mapInPandas(), aby bylo možné transformovat iterátor pandas.DataFrame na jiný iterátor pandas.DataFrame, který představuje aktuální datový rámec PySpark a vrátí výsledek jako datový rámec PySpark.
Základní funkce přebírá a vypíše iterátor pandas.DataFrame. Může vrátit výstup libovolné délky na rozdíl od některých uživatelsky definovaných funkcí pandas, jako je řady na řady.
Následující příklad ukazuje, jak používat mapInPandas():
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Podrobné pokyny pro použití naleznete v části pyspark.sql.DataFrame.mapInPandas.
Kogroupovaná mapa
U operací mapování se spoluskupenými instancemi pandas použijte DataFrame.groupby().cogroup().applyInPandas() ke spoluskupení dvou PySpark DataFramepomocí společného klíče a pak aplikujte funkci Pythonu na každou spoluskupinu, jak je znázorněno.
- Prohazujte data tak, aby se skupiny jednotlivých datových rámců, které sdílejí klíč, společně seskupily.
- Aplikujte funkci na každou skupinu. Vstup funkce jsou dva
pandas.DataFrame(s volitelnou n-ticí představující klíč). Výstupem funkce jepandas.DataFrame. - Sloučte
pandas.DataFrameze všech skupin do nového PySparkDataFrame.
Pokud chcete použít groupBy().cogroup().applyInPandas(), musíte definovat následující:
- Funkce Pythonu, která definuje výpočet pro každou spoluskupinu.
- Objekt
StructTypenebo řetězec, který definuje schéma výstupního PySparkDataFrame.
Popisky sloupců vrácených pandas.DataFrame musí buď odpovídat názvům polí v definovaném výstupním schématu, pokud jsou zadány jako řetězce, nebo musí odpovídat datovým typům polí podle pozice, například celočíselné indexy. Viz pandas.DataFrame pro to, jak označit sloupce při vytváření pandas.DataFrame.
Všechna data pro spoluskupinu se načtou do paměti před tím, než se funkce použije. To může vést k výjimkám typu nedostatek paměti, zejména pokud jsou velikosti skupin zkreslené. Konfigurace pro maxRecordsPerBatch se nepoužije a je na vás, abyste zajistili, že spoluskupovaná data zapadnou do dostupné paměti.
Následující příklad ukazuje, jak pomocí groupby().cogroup().applyInPandas() provést asof join mezi dvěma datovými sadami.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Pro podrobné informace o využití se podívejte na pyspark.sql.PandasCogroupedOps.applyInPandas.