Eine verteilte Sammlung von Daten, die in benannte Spalten gruppiert sind.
Ein DataFrame entspricht einer relationalen Tabelle in Spark SQL und kann mithilfe verschiedener Funktionen in SparkSession erstellt werden.
Von Bedeutung
Ein DataFrame sollte nicht direkt mit dem Konstruktor erstellt werden.
Unterstützt Spark Connect
Eigenschaften
| Eigentum |
Beschreibung |
sparkSession |
Gibt SparkSession zurück, die diesen DataFrame erstellt hat. |
rdd |
Gibt den Inhalt als RDD der Zeile zurück (nur im klassischen Modus). |
na |
Gibt einen DataFrameNaFunctions-Wert für die Behandlung fehlender Werte zurück. |
stat |
Gibt eine DataFrameStatFunctions für Statistikfunktionen zurück. |
write |
Schnittstelle zum Speichern des Inhalts des nicht streamingenden DataFrames in externen Speicher. |
writeStream |
Schnittstelle zum Speichern des Inhalts des Streaming-DataFrames in externen Speicher. |
schema |
Gibt das Schema dieses DataFrames als StructType zurück. |
dtypes |
Gibt alle Spaltennamen und deren Datentypen als Liste zurück. |
columns |
Ruft die Namen aller Spalten im DataFrame als Liste ab. |
storageLevel |
Rufen Sie die aktuelle Speicherebene von DataFrame ab. |
isStreaming |
Gibt True zurück, wenn dieser DataFrame eine oder mehrere Quellen enthält, die beim Eintreffen fortlaufend Daten zurückgeben. |
executionInfo |
Gibt ein ExecutionInfo -Objekt zurück, nachdem die Abfrage ausgeführt wurde. |
plot |
Gibt einen PySparkPlotAccessor für das Zeichnen von Funktionen zurück. |
Methodik
Datenanzeige und -inspektion
| Methode |
Beschreibung |
toJSON(use_unicode) |
Konvertiert einen DataFrame in eine RDD von Zeichenfolge oder DataFrame. |
printSchema(level) |
Druckt das Schema im Strukturformat aus. |
explain(extended, mode) |
Druckt die Pläne (logische und physische) für debuggingzwecke in der Konsole. |
show(n, truncate, vertical) |
Druckt die ersten n Zeilen des DataFrame in der Konsole. |
collect() |
Gibt alle Datensätze im DataFrame als Liste der Zeile zurück. |
toLocalIterator(prefetchPartitions) |
Gibt einen Iterator zurück, der alle Zeilen in diesem DataFrame enthält. |
take(num) |
Gibt die ersten Zeilen als Liste der Zeile zurück. |
tail(num) |
Gibt die letzten Zeilen als Liste der Zeile zurück. |
head(n) |
Gibt die ersten n Zeilen zurück. |
first() |
Gibt die erste Zeile als Zeile zurück. |
count() |
Gibt die Anzahl der Zeilen in diesem DataFrame zurück. |
isEmpty() |
Überprüft, ob der DataFrame leer ist und einen booleschen Wert zurückgibt. |
describe(*cols) |
Berechnet grundlegende Statistiken für numerische und Zeichenfolgenspalten. |
summary(*statistics) |
Berechnet angegebene Statistiken für numerische und Zeichenfolgenspalten. |
Temporäre Ansichten
Auswahl und Projektion
| Methode |
Beschreibung |
select(*cols) |
Projiziert einen Satz von Ausdrücken und gibt einen neuen DataFrame zurück. |
selectExpr(*expr) |
Projiziert eine Reihe von SQL-Ausdrücken und gibt einen neuen DataFrame zurück. |
filter(condition) |
Filtert Zeilen mithilfe der angegebenen Bedingung. |
where(condition) |
Alias für Filter. |
drop(*cols) |
Gibt einen neuen DataFrame ohne angegebene Spalten zurück. |
toDF(*cols) |
Gibt einen neuen DataFrame mit neuen angegebenen Spaltennamen zurück. |
withColumn(colName, col) |
Gibt einen neuen DataFrame zurück, indem eine Spalte hinzugefügt oder die vorhandene Spalte ersetzt wird, die denselben Namen hat. |
withColumns(*colsMap) |
Gibt einen neuen DataFrame zurück, indem mehrere Spalten hinzugefügt oder die vorhandenen Spalten mit denselben Namen ersetzt werden. |
withColumnRenamed(existing, new) |
Gibt einen neuen DataFrame zurück, indem eine vorhandene Spalte umbenannt wird. |
withColumnsRenamed(colsMap) |
Gibt einen neuen DataFrame zurück, indem mehrere Spalten umbenannt werden. |
withMetadata(columnName, metadata) |
Gibt einen neuen DataFrame zurück, indem eine vorhandene Spalte mit Metadaten aktualisiert wird. |
metadataColumn(colName) |
Wählt eine Metadatenspalte basierend auf dem logischen Spaltennamen aus und gibt sie als Spalte zurück. |
colRegex(colName) |
Wählt die Spalte basierend auf dem spaltennamen aus, der als regex angegeben wurde, und gibt sie als Spalte zurück. |
Sortieren und Sortieren
Aggregation und Gruppierung
| Methode |
Beschreibung |
groupBy(*cols) |
Gruppiert den DataFrame nach den angegebenen Spalten, sodass die Aggregation für sie ausgeführt werden kann. |
rollup(*cols) |
Erstellen Sie ein mehrdimensionales Rollup für den aktuellen DataFrame mithilfe der angegebenen Spalten. |
cube(*cols) |
Erstellen Sie einen mehrdimensionalen Cube für den aktuellen DataFrame mithilfe der angegebenen Spalten. |
groupingSets(groupingSets, *cols) |
Erstellen Sie eine mehrdimensionale Aggregation für den aktuellen DataFrame mithilfe der angegebenen Gruppierungssätze. |
agg(*exprs) |
Aggregat für den gesamten DataFrame ohne Gruppen (Kurzform für df.groupBy().agg()). |
observe(observation, *exprs) |
Definieren (benannter) Metriken, die im DataFrame beobachtet werden sollen. |
Verknüpfungen
Festlegen von Vorgängen
| Methode |
Beschreibung |
union(other) |
Gibt einen neuen DataFrame zurück, der die Vereinigung von Zeilen in diesem und einem anderen DataFrame enthält. |
unionByName(other, allowMissingColumns) |
Gibt einen neuen DataFrame zurück, der die Vereinigung von Zeilen in diesem und einem anderen DataFrame enthält. |
intersect(other) |
Gibt einen neuen DataFrame zurück, der nur Zeilen in diesem DataFrame und einem anderen DataFrame enthält. |
intersectAll(other) |
Gibt einen neuen DataFrame zurück, der Zeilen in diesem DataFrame und einem anderen DataFrame enthält, während Duplikate beibehalten werden. |
subtract(other) |
Gibt einen neuen DataFrame zurück, der Zeilen in diesem DataFrame enthält, aber nicht in einem anderen DataFrame. |
exceptAll(other) |
Gibt einen neuen DataFrame zurück, der Zeilen in diesem DataFrame enthält, aber nicht in einem anderen DataFrame, während Duplikate beibehalten werden. |
Deduplication
| Methode |
Beschreibung |
distinct() |
Gibt einen neuen DataFrame zurück, der die unterschiedlichen Zeilen in diesem DataFrame enthält. |
dropDuplicates(subset) |
Zurückgeben eines neuen DataFrames mit entfernten doppelten Zeilen, optional nur unter Berücksichtigung bestimmter Spalten. |
dropDuplicatesWithinWatermark(subset) |
Zurückgeben eines neuen DataFrames mit entfernten doppelten Zeilen, optional nur unter Berücksichtigung bestimmter Spalten innerhalb des Wasserzeichens. |
Sampling und Splitting
Partitionierung
| Methode |
Beschreibung |
coalesce(numPartitions) |
Gibt einen neuen DataFrame zurück, der genau numPartitions-Partitionen enthält. |
repartition(numPartitions, *cols) |
Gibt einen neuen DataFrame zurück, der von den angegebenen Partitionierungsausdrücken partitioniert wird. |
repartitionByRange(numPartitions, *cols) |
Gibt einen neuen DataFrame zurück, der von den angegebenen Partitionierungsausdrücken partitioniert wird. |
repartitionById(numPartitions, partitionIdCol) |
Gibt einen neuen DataFrame zurück, der vom angegebenen Partitions-ID-Ausdruck partitioniert wird. |
Umformung
Behandlung fehlender Daten
Statistische Funktionen
Schemavorgänge
| Methode |
Beschreibung |
to(schema) |
Gibt einen neuen DataFrame zurück, in dem jede Zeile mit dem angegebenen Schema übereinstimmt. |
alias(alias) |
Gibt einen neuen DataFrame mit einem Aliassatz zurück. |
Iteration
| Methode |
Beschreibung |
foreach(f) |
Wendet die f-Funktion auf alle Zeilen dieses DataFrame an. |
foreachPartition(f) |
Wendet die f-Funktion auf jede Partition dieses DataFrame an. |
Zwischenspeichern und Persistenz
| Methode |
Beschreibung |
cache() |
Persists the DataFrame with the default storage level (MEMORY_AND_DISK_DESER). |
persist(storageLevel) |
Legt die Speicherebene fest, um den Inhalt des DataFrames über Vorgänge hinweg beizubehalten. |
unpersist(blocking) |
Markiert den DataFrame als nicht persistent, und entfernt alle Blöcke dafür aus dem Arbeitsspeicher und dem Datenträger. |
Erstellen von Prüfpunkten
Streamingvorgänge
Optimierungshinweise
Grenzwerte und Offsets
| Methode |
Beschreibung |
limit(num) |
Beschränkt die Ergebnisanzahl auf die angegebene Zahl. |
offset(num) |
Gibt einen neuen DataFrame zurück, indem die ersten n Zeilen übersprungen werden. |
Konvertierungsmethoden
Schreiben von Daten
| Methode |
Beschreibung |
writeTo(table) |
Erstellen Sie einen Schreibkonfigurations-Generator für v2-Quellen. |
mergeInto(table, condition) |
Führt eine Reihe von Aktualisierungen, Einfügungen und Löschungen basierend auf einer Quelltabelle in eine Zieltabelle zusammen. |
DataFrame-Vergleich
| Methode |
Beschreibung |
sameSemantics(other) |
Gibt True zurück, wenn die logischen Abfragepläne in beiden DataFrames gleich sind. |
semanticHash() |
Gibt einen Hashcode des logischen Abfrageplans für diesen DataFrame zurück. |
| Methode |
Beschreibung |
inputFiles() |
Gibt eine Momentaufnahme der Dateien zurück, die diesen DataFrame verfassen. |
Erweiterte SQL-Features
| Methode |
Beschreibung |
isLocal() |
Gibt True zurück, wenn die Methoden zum Sammeln und Übernehmen lokal ausgeführt werden können. |
asTable() |
Konvertiert den DataFrame in ein TableArg -Objekt, das als Tabellenargument in einem TVF verwendet werden kann. |
scalar() |
Gibt ein Column-Objekt für eine SKALAR-Unterabfrage zurück, die genau eine Zeile und eine Spalte enthält. |
exists() |
Gibt ein Column-Objekt für eine EXISTS-Unterabfrage zurück. |
Beispiele
Grundlegende DataFrame-Vorgänge
# Create a DataFrame
people = spark.createDataFrame([
{"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
{"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
{"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
{"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])
# Select columns
people.select("name", "age").show()
# Filter rows
people.filter(people.age > 30).show()
# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()
Aggregation und Gruppierung
# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()
# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
F.avg("salary").alias("avg_salary"),
F.max("age").alias("max_age")
).show()
Verknüpfungen
# Create another DataFrame
department = spark.createDataFrame([
{"id": 1, "name": "PySpark"},
{"id": 2, "name": "ML"},
{"id": 3, "name": "Spark SQL"}
])
# Join DataFrames
people.join(department, people.deptId == department.id).show()
# Chained operations
result = people.filter(people.age > 30) \\
.join(department, people.deptId == department.id) \\
.groupBy(department.name, "gender") \\
.agg({"salary": "avg", "age": "max"}) \\
.sort("max(age)")
result.show()