DataFrame – třída

Distribuovaná kolekce dat seskupených do pojmenovaných sloupců

Datový rámec je ekvivalentem relační tabulky ve Spark SQL a dá se vytvořit pomocí různých funkcí ve SparkSession.

Důležité

Datový rámec by neměl být vytvořen přímo pomocí konstruktoru.

Podporuje Spark Connect.

Vlastnosti

Vlastnictví Description
sparkSession Vrátí SparkSession , která vytvořila tento datový rámec.
rdd Vrátí obsah jako RDD řádku (pouze klasický režim).
na Vrátí objekt DataFrameNaFunctions pro zpracování chybějících hodnot.
stat Vrátí funkci DataFrameStatFunctions pro statistické funkce.
write Rozhraní pro ukládání obsahu nestreamovaného datového rámce do externího úložiště.
writeStream Rozhraní pro ukládání obsahu streamovaného datového rámce do externího úložiště
schema Vrátí schéma tohoto datového rámce jako typ struktury.
dtypes Vrátí všechny názvy sloupců a jejich datové typy jako seznam.
columns Načte názvy všech sloupců v datovém rámci jako seznam.
storageLevel Získejte aktuální úroveň úložiště datového rámce.
isStreaming Vrátí hodnotu True, pokud tento datový rámec obsahuje jeden nebo více zdrojů, které průběžně vracejí data při jejich doručení.
executionInfo Vrátí objekt ExecutionInfo po provedení dotazu.
plot Vrátí PySparkPlotAccessor pro vykreslení funkcí.

Methods

Zobrazení a kontrola dat

Metoda Description
toJSON(use_unicode) Převede datový rámec na rdD řetězce nebo datového rámce.
printSchema(level) Vytiskne schéma ve stromovém formátu.
explain(extended, mode) Vytiskne plány (logické a fyzické) do konzoly pro účely ladění.
show(n, truncate, vertical) Vytiskne prvních n řádků datového rámce do konzoly.
collect() Vrátí všechny záznamy v datovém rámci jako seznam řádků.
toLocalIterator(prefetchPartitions) Vrátí iterátor, který obsahuje všechny řádky v tomto datovém rámci.
take(num) Vrátí první číslo řádků jako seznam řádků.
tail(num) Vrátí poslední počet řádků jako seznam řádků.
head(n) Vrátí prvních n řádků.
first() Vrátí první řádek jako řádek.
count() Vrátí počet řádků v tomto datovém rámci.
isEmpty() Zkontroluje, jestli je datový rámec prázdný a vrátí logickou hodnotu.
describe(*cols) Vypočítá základní statistiku pro číselné a řetězcové sloupce.
summary(*statistics) Vypočítá zadanou statistiku pro číselné a řetězcové sloupce.

Dočasná zobrazení

Metoda Description
createTempView(name) Vytvoří místní dočasné zobrazení s tímto datovým rámcem.
createOrReplaceTempView(name) Vytvoří nebo nahradí místní dočasné zobrazení tímto datovým rámcem.
createGlobalTempView(name) Vytvoří globální dočasné zobrazení s tímto datovým rámcem.
createOrReplaceGlobalTempView(name) Vytvoří nebo nahradí globální dočasné zobrazení s použitím daného názvu.

Výběr a projekce

Metoda Description
select(*cols) Prodá sadu výrazů a vrátí nový datový rámec.
selectExpr(*expr) Prodá sadu výrazů SQL a vrátí nový datový rámec.
filter(condition) Filtruje řádky pomocí dané podmínky.
where(condition) Alias pro filtr
drop(*cols) Vrátí nový datový rámec bez zadaných sloupců.
toDF(*cols) Vrátí nový datový rámec s novými zadanými názvy sloupců.
withColumn(colName, col) Vrátí nový datový rámec přidáním sloupce nebo nahrazením existujícího sloupce se stejným názvem.
withColumns(*colsMap) Vrátí nový datový rámec přidáním více sloupců nebo nahrazením existujících sloupců se stejnými názvy.
withColumnRenamed(existing, new) Vrátí nový datový rámec přejmenováním existujícího sloupce.
withColumnsRenamed(colsMap) Vrátí nový datový rámec přejmenováním více sloupců.
withMetadata(columnName, metadata) Vrátí nový datový rámec aktualizací existujícího sloupce s metadaty.
metadataColumn(colName) Vybere sloupec metadat na základě názvu logického sloupce a vrátí ho jako sloupec.
colRegex(colName) Vybere sloupec na základě názvu sloupce zadaného jako regulární výraz a vrátí ho jako sloupec.

Řazení a řazení

Metoda Description
sort(*cols, **kwargs) Vrátí nový datový rámec seřazený podle zadaných sloupců.
orderBy(*cols, **kwargs) Alias pro řazení
sortWithinPartitions(*cols, **kwargs) Vrátí nový datový rámec s každým oddílem seřazeným podle zadaných sloupců.

Agregace a seskupení

Metoda Description
groupBy(*cols) Seskupí datový rámec podle zadaných sloupců, aby s nimi bylo možné provést agregaci.
rollup(*cols) Vytvořte vícerozměrnou kumulativní aktualizaci pro aktuální datový rámec pomocí zadaných sloupců.
cube(*cols) Vytvořte multidimenzionální datovou krychli pro aktuální datový rámec pomocí zadaných sloupců.
groupingSets(groupingSets, *cols) Vytvořte multidimenzionální agregaci pro aktuální datový rámec pomocí zadaných sad seskupení.
agg(*exprs) Agregujte celý datový rámec bez skupin (zkratka pro df.groupBy().agg()).
observe(observation, *exprs) Definujte (pojmenované) metriky, které chcete sledovat v datovém rámci.

Joins

Metoda Description
join(other, on, how) Spojí s jiným datovým rámcem pomocí daného výrazu join.
crossJoin(other) Vrátí kartézský součin s jiným datovým rámcem.
lateralJoin(other, on, how) Laterální spojení s jiným datovým rámcem pomocí daného výrazu join.

Nastavení operací

Metoda Description
union(other) Vrátí nový datový rámec obsahující sjednocení řádků v tomto a jiném datovém rámci.
unionByName(other, allowMissingColumns) Vrátí nový datový rámec obsahující sjednocení řádků v tomto a jiném datovém rámci.
intersect(other) Vrátí nový datový rámec obsahující řádky pouze v tomto datovém rámci i v jiném datovém rámci.
intersectAll(other) Vrátí nový datový rámec obsahující řádky v tomto datovém rámci i v jiném datovém rámci při zachování duplicit.
subtract(other) Vrátí nový datový rámec obsahující řádky v tomto datovém rámci, ale ne v jiném datovém rámci.
exceptAll(other) Vrátí nový datový rámec obsahující řádky v tomto datovém rámci, ale ne v jiném datovém rámci při zachování duplicit.

Deduplication

Metoda Description
distinct() Vrátí nový datový rámec obsahující jedinečné řádky v tomto datovém rámci.
dropDuplicates(subset) Vrátí nový datový rámec s odstraněnými duplicitními řádky, volitelně pouze s ohledem na určité sloupce.
dropDuplicatesWithinWatermark(subset) Vrátí nový datový rámec s odstraněnými duplicitními řádky, volitelně pouze s ohledem na určité sloupce v rámci vodoznaku.

Vzorkování a rozdělení

Metoda Description
sample(withReplacement, fraction, seed) Vrátí vzorek podmnožinu tohoto datového rámce.
sampleBy(col, fractions, seed) Vrátí stratifikovaný vzorek bez nahrazení na základě zlomku zadaného na každé vrstvě.
randomSplit(weights, seed) Náhodně rozdělí tento datový rámec s poskytnutými váhami.

Partitioning

Metoda Description
coalesce(numPartitions) Vrátí nový datový rámec s přesně numPartitions oddíly.
repartition(numPartitions, *cols) Vrátí nový datový rámec dělený danými výrazy dělení.
repartitionByRange(numPartitions, *cols) Vrátí nový datový rámec dělený danými výrazy dělení.
repartitionById(numPartitions, partitionIdCol) Vrátí nový datový rámec rozdělený podle daného výrazu ID oddílu.

Přeformování

Metoda Description
unpivot(ids, values, variableColumnName, valueColumnName) Převést datový rámec z širokého formátu na dlouhý.
melt(ids, values, variableColumnName, valueColumnName) Alias pro převést na řádky
transpose(indexColumn) Transponuje datový rámec tak, aby se hodnoty v zadaném sloupci indexu staly novými sloupci.

Chybějící zpracování dat

Metoda Description
dropna(how, thresh, subset) Vrátí nový datový rámec, který vynechá řádky s hodnotami null nebo NaN.
fillna(value, subset) Vrátí nový datový rámec, který hodnoty null vyplní novou hodnotou.
replace(to_replace, value, subset) Vrátí nový datový rámec, který nahradí hodnotu jinou hodnotou.

Statistické funkce

Metoda Description
approxQuantile(col, probabilities, relativeError) Vypočítá přibližné kvantové hodnoty číselných sloupců datového rámce.
corr(col1, col2, method) Vypočítá korelaci dvou sloupců datového rámce jako dvojité hodnoty.
cov(col1, col2) Vypočítejte kovarianci vzorku pro dané sloupce, které jsou určené jejich názvy.
crosstab(col1, col2) Vypočítá tabulku četnosti párů daných sloupců.
freqItems(cols, support) Hledání častých položek pro sloupce, pravděpodobně s falešně pozitivními výsledky

Operace schématu

Metoda Description
to(schema) Vrátí nový datový rámec, ve kterém je každý řádek odsouhlasený tak, aby odpovídal zadanému schématu.
alias(alias) Vrátí nový datový rámec se sadou aliasů.

Iterace

Metoda Description
foreach(f) Použije funkci f na všechny řádky tohoto datového rámce.
foreachPartition(f) Použije funkci f pro každý oddíl tohoto datového rámce.

Ukládání do mezipaměti a trvalost

Metoda Description
cache() Zachová datový rámec s výchozí úrovní úložiště (MEMORY_AND_DISK_DESER).
persist(storageLevel) Nastaví úroveň úložiště pro zachování obsahu datového rámce napříč operacemi.
unpersist(blocking) Označí datový rámec jako trvalý a odebere všechny bloky z paměti a disku.

Vytváření kontrolních bodů

Metoda Description
checkpoint(eager) Vrátí kontrolní bodovou verzi tohoto datového rámce.
localCheckpoint(eager, storageLevel) Vrátí místně označenou verzi tohoto datového rámce.

Operace streamování

Metoda Description
withWatermark(eventTime, delayThreshold) Definuje meze času události pro tento datový rámec.

Rady pro optimalizaci

Metoda Description
hint(name, *parameters) Určuje nápovědu k aktuálnímu datovému rámci.

Limity a posuny

Metoda Description
limit(num) Omezí počet výsledků na zadané číslo.
offset(num) Vrátí nový datový rámec přeskočením prvních n řádků.

Pokročilé transformace

Metoda Description
transform(func, *args, **kwargs) Vrátí nový datový rámec. Stručná syntaxe pro zřetězení vlastních transformací

Metody převodu

Metoda Description
toPandas() Vrátí obsah tohoto datového rámce jako pandas pandas. Datový rámec.
toArrow() Vrátí obsah tohoto datového rámce jako PyArrow pyarrow. Tabulka.
pandas_api(index_col) Převede existující datový rámec na datový rámec pandas-on-Spark.
mapInPandas(func, schema, barrier, profile) Mapuje iterátor dávek v aktuálním datovém rámci pomocí nativní funkce Pythonu.
mapInArrow(func, schema, barrier, profile) Mapuje iterátor dávek v aktuálním datovém rámci pomocí nativní funkce Pythonu, která se provádí na pyarrow. RecordBatch.

Zápis dat

Metoda Description
writeTo(table) Vytvořte tvůrce konfigurace zápisu pro zdroje v2.
mergeInto(table, condition) Sloučí sadu aktualizací, vložení a odstranění na základě zdrojové tabulky do cílové tabulky.

Porovnání datových rámců

Metoda Description
sameSemantics(other) Vrátí hodnotu True, pokud jsou plány logických dotazů uvnitř obou datových rámců stejné.
semanticHash() Vrátí kód hash logického plánu dotazu pro tento datový rámec.

Metadata a informace o souborech

Metoda Description
inputFiles() Vrátí snímek souborů, které tvoří tento datový rámec.

Pokročilé funkce SQL

Metoda Description
isLocal() Vrátí hodnotu True, pokud je možné spustit metody collect a take místně.
asTable() Převede datový rámec na objekt TableArg, který lze použít jako argument tabulky v TVF.
scalar() Vrátí objekt Column pro poddotaz SCALAR obsahující přesně jeden řádek a jeden sloupec.
exists() Vrátí objekt Column pro poddotaz EXISTS.

Příklady

Základní operace datového rámce

# Create a DataFrame
people = spark.createDataFrame([
    {"deptId": 1, "age": 40, "name": "Alice", "gender": "M", "salary": 50},
    {"deptId": 1, "age": 50, "name": "Bob", "gender": "M", "salary": 100},
    {"deptId": 2, "age": 60, "name": "Sue", "gender": "F", "salary": 150},
    {"deptId": 3, "age": 20, "name": "Tom", "gender": "M", "salary": 200}
])

# Select columns
people.select("name", "age").show()

# Filter rows
people.filter(people.age > 30).show()

# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()

Agregace a seskupení

# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()

# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
    F.avg("salary").alias("avg_salary"),
    F.max("age").alias("max_age")
).show()

Joins

# Create another DataFrame
department = spark.createDataFrame([
    {"id": 1, "name": "PySpark"},
    {"id": 2, "name": "ML"},
    {"id": 3, "name": "Spark SQL"}
])

# Join DataFrames
people.join(department, people.deptId == department.id).show()

Složité transformace

# Chained operations
result = people.filter(people.age > 30) \\
    .join(department, people.deptId == department.id) \\
    .groupBy(department.name, "gender") \\
    .agg({"salary": "avg", "age": "max"}) \\
    .sort("max(age)")
result.show()