Freigeben über


DataFrame-Klasse

Eine verteilte Sammlung von Daten, die in benannte Spalten gruppiert sind.

Ein DataFrame entspricht einer relationalen Tabelle in Spark SQL und kann mithilfe verschiedener Funktionen in SparkSession erstellt werden.

Von Bedeutung

Ein DataFrame sollte nicht direkt mit dem Konstruktor erstellt werden.

Unterstützt Spark Connect

Eigenschaften

Eigentum Beschreibung
sparkSession Gibt SparkSession zurück, die diesen DataFrame erstellt hat.
rdd Gibt den Inhalt als RDD der Zeile zurück (nur im klassischen Modus).
na Gibt einen DataFrameNaFunctions-Wert für die Behandlung fehlender Werte zurück.
stat Gibt eine DataFrameStatFunctions für Statistikfunktionen zurück.
write Schnittstelle zum Speichern des Inhalts des nicht streamingenden DataFrames in externen Speicher.
writeStream Schnittstelle zum Speichern des Inhalts des Streaming-DataFrames in externen Speicher.
schema Gibt das Schema dieses DataFrames als StructType zurück.
dtypes Gibt alle Spaltennamen und deren Datentypen als Liste zurück.
columns Ruft die Namen aller Spalten im DataFrame als Liste ab.
storageLevel Rufen Sie die aktuelle Speicherebene von DataFrame ab.
isStreaming Gibt True zurück, wenn dieser DataFrame eine oder mehrere Quellen enthält, die beim Eintreffen fortlaufend Daten zurückgeben.
executionInfo Gibt ein ExecutionInfo -Objekt zurück, nachdem die Abfrage ausgeführt wurde.
plot Gibt einen PySparkPlotAccessor für das Zeichnen von Funktionen zurück.

Methodik

Datenanzeige und -inspektion

Methode Beschreibung
toJSON(use_unicode) Konvertiert einen DataFrame in eine RDD von Zeichenfolge oder DataFrame.
printSchema(level) Druckt das Schema im Strukturformat aus.
explain(extended, mode) Druckt die Pläne (logische und physische) für debuggingzwecke in der Konsole.
show(n, truncate, vertical) Druckt die ersten n Zeilen des DataFrame in der Konsole.
collect() Gibt alle Datensätze im DataFrame als Liste der Zeile zurück.
toLocalIterator(prefetchPartitions) Gibt einen Iterator zurück, der alle Zeilen in diesem DataFrame enthält.
take(num) Gibt die ersten Zeilen als Liste der Zeile zurück.
tail(num) Gibt die letzten Zeilen als Liste der Zeile zurück.
head(n) Gibt die ersten n Zeilen zurück.
first() Gibt die erste Zeile als Zeile zurück.
count() Gibt die Anzahl der Zeilen in diesem DataFrame zurück.
isEmpty() Überprüft, ob der DataFrame leer ist und einen booleschen Wert zurückgibt.
describe(*cols) Berechnet grundlegende Statistiken für numerische und Zeichenfolgenspalten.
summary(*statistics) Berechnet angegebene Statistiken für numerische und Zeichenfolgenspalten.

Temporäre Ansichten

Methode Beschreibung
createTempView(name) Erstellt eine lokale temporäre Ansicht mit diesem DataFrame.
createOrReplaceTempView(name) Erstellt oder ersetzt eine lokale temporäre Ansicht durch diesen DataFrame.
createGlobalTempView(name) Erstellt eine globale temporäre Ansicht mit diesem DataFrame.
createOrReplaceGlobalTempView(name) Erstellt oder ersetzt eine globale temporäre Ansicht mit dem angegebenen Namen.

Auswahl und Projektion

Methode Beschreibung
select(*cols) Projiziert einen Satz von Ausdrücken und gibt einen neuen DataFrame zurück.
selectExpr(*expr) Projiziert eine Reihe von SQL-Ausdrücken und gibt einen neuen DataFrame zurück.
filter(condition) Filtert Zeilen mithilfe der angegebenen Bedingung.
where(condition) Alias für Filter.
drop(*cols) Gibt einen neuen DataFrame ohne angegebene Spalten zurück.
toDF(*cols) Gibt einen neuen DataFrame mit neuen angegebenen Spaltennamen zurück.
withColumn(colName, col) Gibt einen neuen DataFrame zurück, indem eine Spalte hinzugefügt oder die vorhandene Spalte ersetzt wird, die denselben Namen hat.
withColumns(*colsMap) Gibt einen neuen DataFrame zurück, indem mehrere Spalten hinzugefügt oder die vorhandenen Spalten mit denselben Namen ersetzt werden.
withColumnRenamed(existing, new) Gibt einen neuen DataFrame zurück, indem eine vorhandene Spalte umbenannt wird.
withColumnsRenamed(colsMap) Gibt einen neuen DataFrame zurück, indem mehrere Spalten umbenannt werden.
withMetadata(columnName, metadata) Gibt einen neuen DataFrame zurück, indem eine vorhandene Spalte mit Metadaten aktualisiert wird.
metadataColumn(colName) Wählt eine Metadatenspalte basierend auf dem logischen Spaltennamen aus und gibt sie als Spalte zurück.
colRegex(colName) Wählt die Spalte basierend auf dem spaltennamen aus, der als regex angegeben wurde, und gibt sie als Spalte zurück.

Sortieren und Sortieren

Methode Beschreibung
sort(*cols, **kwargs) Gibt einen neuen DataFrame zurück, der nach den angegebenen Spalten sortiert ist.
orderBy(*cols, **kwargs) Alias für sortierung.
sortWithinPartitions(*cols, **kwargs) Gibt einen neuen DataFrame zurück, wobei jede Partition nach den angegebenen Spalten sortiert ist.

Aggregation und Gruppierung

Methode Beschreibung
groupBy(*cols) Gruppiert den DataFrame nach den angegebenen Spalten, sodass die Aggregation für sie ausgeführt werden kann.
rollup(*cols) Erstellen Sie ein mehrdimensionales Rollup für den aktuellen DataFrame mithilfe der angegebenen Spalten.
cube(*cols) Erstellen Sie einen mehrdimensionalen Cube für den aktuellen DataFrame mithilfe der angegebenen Spalten.
groupingSets(groupingSets, *cols) Erstellen Sie eine mehrdimensionale Aggregation für den aktuellen DataFrame mithilfe der angegebenen Gruppierungssätze.
agg(*exprs) Aggregat für den gesamten DataFrame ohne Gruppen (Kurzform für df.groupBy().agg()).
observe(observation, *exprs) Definieren (benannter) Metriken, die im DataFrame beobachtet werden sollen.

Verknüpfungen

Methode Beschreibung
join(other, on, how) Verknüpft mit einem anderen DataFrame mithilfe des angegebenen Verknüpfungsausdrucks.
crossJoin(other) Gibt das kartesische Produkt mit einem anderen DataFrame zurück.
lateralJoin(other, on, how) Lateral joins with another DataFrame, using the given join expression.

Festlegen von Vorgängen

Methode Beschreibung
union(other) Gibt einen neuen DataFrame zurück, der die Vereinigung von Zeilen in diesem und einem anderen DataFrame enthält.
unionByName(other, allowMissingColumns) Gibt einen neuen DataFrame zurück, der die Vereinigung von Zeilen in diesem und einem anderen DataFrame enthält.
intersect(other) Gibt einen neuen DataFrame zurück, der nur Zeilen in diesem DataFrame und einem anderen DataFrame enthält.
intersectAll(other) Gibt einen neuen DataFrame zurück, der Zeilen in diesem DataFrame und einem anderen DataFrame enthält, während Duplikate beibehalten werden.
subtract(other) Gibt einen neuen DataFrame zurück, der Zeilen in diesem DataFrame enthält, aber nicht in einem anderen DataFrame.
exceptAll(other) Gibt einen neuen DataFrame zurück, der Zeilen in diesem DataFrame enthält, aber nicht in einem anderen DataFrame, während Duplikate beibehalten werden.

Deduplication

Methode Beschreibung
distinct() Gibt einen neuen DataFrame zurück, der die unterschiedlichen Zeilen in diesem DataFrame enthält.
dropDuplicates(subset) Zurückgeben eines neuen DataFrames mit entfernten doppelten Zeilen, optional nur unter Berücksichtigung bestimmter Spalten.
dropDuplicatesWithinWatermark(subset) Zurückgeben eines neuen DataFrames mit entfernten doppelten Zeilen, optional nur unter Berücksichtigung bestimmter Spalten innerhalb des Wasserzeichens.

Sampling und Splitting

Methode Beschreibung
sample(withReplacement, fraction, seed) Gibt eine beispielierte Teilmenge dieses DataFrame zurück.
sampleBy(col, fractions, seed) Gibt eine gestazierte Probe ohne Ersatz basierend auf dem Bruch zurück, der auf den einzelnen Schichten angegeben ist.
randomSplit(weights, seed) Teilt diesen DataFrame zufällig mit den bereitgestellten Gewichtungen auf.

Partitionierung

Methode Beschreibung
coalesce(numPartitions) Gibt einen neuen DataFrame zurück, der genau numPartitions-Partitionen enthält.
repartition(numPartitions, *cols) Gibt einen neuen DataFrame zurück, der von den angegebenen Partitionierungsausdrücken partitioniert wird.
repartitionByRange(numPartitions, *cols) Gibt einen neuen DataFrame zurück, der von den angegebenen Partitionierungsausdrücken partitioniert wird.
repartitionById(numPartitions, partitionIdCol) Gibt einen neuen DataFrame zurück, der vom angegebenen Partitions-ID-Ausdruck partitioniert wird.

Umformung

Methode Beschreibung
unpivot(ids, values, variableColumnName, valueColumnName) Heben Sie das Pivotieren eines DataFrames von einem breiten Format in ein langes Format auf.
melt(ids, values, variableColumnName, valueColumnName) Alias für das Aufheben der Pivot-Funktion.
transpose(indexColumn) Transponiert einen DataFrame so, dass die Werte in der angegebenen Indexspalte zu den neuen Spalten werden.

Behandlung fehlender Daten

Methode Beschreibung
dropna(how, thresh, subset) Gibt einen neuen DataFrame zurück, der Zeilen mit Null- oder NaN-Werten weggelassen.
fillna(value, subset) Gibt einen neuen DataFrame zurück, der Nullwerte mit einem neuen Wert gefüllt wird.
replace(to_replace, value, subset) Gibt einen neuen DataFrame zurück, der einen Wert durch einen anderen Wert ersetzt.

Statistische Funktionen

Methode Beschreibung
approxQuantile(col, probabilities, relativeError) Berechnet die ungefähren Quantiles numerischer Spalten eines DataFrames.
corr(col1, col2, method) Berechnet die Korrelation von zwei Spalten eines DataFrames als doppelten Wert.
cov(col1, col2) Berechnen Sie die Stichprobenkovarianz für die angegebenen Spalten, die durch ihre Namen angegeben werden.
crosstab(col1, col2) Berechnet eine paarweise Häufigkeitstabelle der angegebenen Spalten.
freqItems(cols, support) Suchen häufiger Elemente für Spalten, möglicherweise mit falsch positiven Ergebnissen.

Schemavorgänge

Methode Beschreibung
to(schema) Gibt einen neuen DataFrame zurück, in dem jede Zeile mit dem angegebenen Schema übereinstimmt.
alias(alias) Gibt einen neuen DataFrame mit einem Aliassatz zurück.

Iteration

Methode Beschreibung
foreach(f) Wendet die f-Funktion auf alle Zeilen dieses DataFrame an.
foreachPartition(f) Wendet die f-Funktion auf jede Partition dieses DataFrame an.

Zwischenspeichern und Persistenz

Methode Beschreibung
cache() Persists the DataFrame with the default storage level (MEMORY_AND_DISK_DESER).
persist(storageLevel) Legt die Speicherebene fest, um den Inhalt des DataFrames über Vorgänge hinweg beizubehalten.
unpersist(blocking) Markiert den DataFrame als nicht persistent, und entfernt alle Blöcke dafür aus dem Arbeitsspeicher und dem Datenträger.

Erstellen von Prüfpunkten

Methode Beschreibung
checkpoint(eager) Gibt eine prüfpunktierte Version dieses DataFrame zurück.
localCheckpoint(eager, storageLevel) Gibt eine lokal überprüfte Version dieses DataFrame zurück.

Streamingvorgänge

Methode Beschreibung
withWatermark(eventTime, delayThreshold) Definiert ein Ereigniszeitwasserzeichen für diesen DataFrame.

Optimierungshinweise

Methode Beschreibung
hint(name, *parameters) Gibt einen Hinweis auf den aktuellen DataFrame an.

Grenzwerte und Offsets

Methode Beschreibung
limit(num) Beschränkt die Ergebnisanzahl auf die angegebene Zahl.
offset(num) Gibt einen neuen DataFrame zurück, indem die ersten n Zeilen übersprungen werden.

Erweiterte Transformationen

Methode Beschreibung
transform(func, *args, **kwargs) Gibt einen neuen DataFrame zurück. Präzise Syntax zum Verketten von benutzerdefinierten Transformationen.

Konvertierungsmethoden

Methode Beschreibung
toPandas() Gibt den Inhalt dieses DataFrames als Pandas-Pandas zurück. DataFrame.
toArrow() Gibt den Inhalt dieses DataFrames als PyArrow-Pyarrow zurück. Tabelle.
pandas_api(index_col) Konvertiert den vorhandenen DataFrame in einen Pandas-on-Spark DataFrame.
mapInPandas(func, schema, barrier, profile) Ordnet einen Iterator von Batches im aktuellen DataFrame mithilfe einer nativen Python-Funktion zu.
mapInArrow(func, schema, barrier, profile) Ordnet einen Iterator von Batches im aktuellen DataFrame mithilfe einer nativen Python-Funktion zu, die auf Pyarrow ausgeführt wird. RecordBatch.

Schreiben von Daten

Methode Beschreibung
writeTo(table) Erstellen Sie einen Schreibkonfigurations-Generator für v2-Quellen.
mergeInto(table, condition) Führt eine Reihe von Aktualisierungen, Einfügungen und Löschungen basierend auf einer Quelltabelle in eine Zieltabelle zusammen.

DataFrame-Vergleich

Methode Beschreibung
sameSemantics(other) Gibt True zurück, wenn die logischen Abfragepläne in beiden DataFrames gleich sind.
semanticHash() Gibt einen Hashcode des logischen Abfrageplans für diesen DataFrame zurück.

Metadaten und Dateiinformationen

Methode Beschreibung
inputFiles() Gibt eine Momentaufnahme der Dateien zurück, die diesen DataFrame verfassen.

Erweiterte SQL-Features

Methode Beschreibung
isLocal() Gibt True zurück, wenn die Methoden zum Sammeln und Übernehmen lokal ausgeführt werden können.
asTable() Konvertiert den DataFrame in ein TableArg -Objekt, das als Tabellenargument in einem TVF verwendet werden kann.
scalar() Gibt ein Column-Objekt für eine SKALAR-Unterabfrage zurück, die genau eine Zeile und eine Spalte enthält.
exists() Gibt ein Column-Objekt für eine EXISTS-Unterabfrage zurück.

Beispiele

Grundlegende DataFrame-Vorgänge

# Create a DataFrame
people = spark.createDataFrame([
    {"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
    {"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
    {"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
    {"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])

# Select columns
people.select("name", "age").show()

# Filter rows
people.filter(people.age > 30).show()

# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()

Aggregation und Gruppierung

# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()

# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
    F.avg("salary").alias("avg_salary"),
    F.max("age").alias("max_age")
).show()

Verknüpfungen

# Create another DataFrame
department = spark.createDataFrame([
    {"id": 1, "name": "PySpark"},
    {"id": 2, "name": "ML"},
    {"id": 3, "name": "Spark SQL"}
])

# Join DataFrames
people.join(department, people.deptId == department.id).show()

Komplexe Transformationen

# Chained operations
result = people.filter(people.age > 30) \\
    .join(department, people.deptId == department.id) \\
    .groupBy(department.name, "gender") \\
    .agg({"salary": "avg", "age": "max"}) \\
    .sort("max(age)")
result.show()