A distributed collection of data grouped into named columns.
A DataFrame is equivalent to a relational table in Spark SQL, and can be created using various functions in SparkSession.
Important
A DataFrame should not be directly created using the constructor.
Supports Spark Connect
Properties
| Property |
Description |
sparkSession |
Returns SparkSession that created this DataFrame. |
rdd |
Returns the content as an RDD of Row (Classic mode only). |
na |
Returns a DataFrameNaFunctions for handling missing values. |
stat |
Returns a DataFrameStatFunctions for statistic functions. |
write |
Interface for saving the content of the non-streaming DataFrame out into external storage. |
writeStream |
Interface for saving the content of the streaming DataFrame out into external storage. |
schema |
Returns the schema of this DataFrame as a StructType. |
dtypes |
Returns all column names and their data types as a list. |
columns |
Retrieves the names of all columns in the DataFrame as a list. |
storageLevel |
Get the DataFrame's current storage level. |
isStreaming |
Returns True if this DataFrame contains one or more sources that continuously return data as it arrives. |
executionInfo |
Returns a ExecutionInfo object after the query was executed. |
plot |
Returns a PySparkPlotAccessor for plotting functions. |
Methods
Data viewing and inspection
Temporary views
Selection and projection
Sorting and ordering
Aggregation and grouping
| Method |
Description |
groupBy(*cols) |
Groups the DataFrame by the specified columns so that aggregation can be performed on them. |
rollup(*cols) |
Create a multi-dimensional rollup for the current DataFrame using the specified columns. |
cube(*cols) |
Create a multi-dimensional cube for the current DataFrame using the specified columns. |
groupingSets(groupingSets, *cols) |
Create multi-dimensional aggregation for the current DataFrame using the specified grouping sets. |
agg(*exprs) |
Aggregate on the entire DataFrame without groups (shorthand for df.groupBy().agg()). |
observe(observation, *exprs) |
Define (named) metrics to observe on the DataFrame. |
Joins
Set operations
| Method |
Description |
union(other) |
Return a new DataFrame containing the union of rows in this and another DataFrame. |
unionByName(other, allowMissingColumns) |
Returns a new DataFrame containing union of rows in this and another DataFrame. |
intersect(other) |
Return a new DataFrame containing rows only in both this DataFrame and another DataFrame. |
intersectAll(other) |
Return a new DataFrame containing rows in both this DataFrame and another DataFrame while preserving duplicates. |
subtract(other) |
Return a new DataFrame containing rows in this DataFrame but not in another DataFrame. |
exceptAll(other) |
Return a new DataFrame containing rows in this DataFrame but not in another DataFrame while preserving duplicates. |
Deduplication
| Method |
Description |
distinct() |
Returns a new DataFrame containing the distinct rows in this DataFrame. |
dropDuplicates(subset) |
Return a new DataFrame with duplicate rows removed, optionally only considering certain columns. |
dropDuplicatesWithinWatermark(subset) |
Return a new DataFrame with duplicate rows removed, optionally only considering certain columns, within watermark. |
Sampling and splitting
Partitioning
| Method |
Description |
coalesce(numPartitions) |
Returns a new DataFrame that has exactly numPartitions partitions. |
repartition(numPartitions, *cols) |
Returns a new DataFrame partitioned by the given partitioning expressions. |
repartitionByRange(numPartitions, *cols) |
Returns a new DataFrame partitioned by the given partitioning expressions. |
repartitionById(numPartitions, partitionIdCol) |
Returns a new DataFrame partitioned by the given partition ID expression. |
Reshaping
Missing data handling
Statistical functions
Schema operations
| Method |
Description |
to(schema) |
Returns a new DataFrame where each row is reconciled to match the specified schema. |
alias(alias) |
Returns a new DataFrame with an alias set. |
Iteration
| Method |
Description |
foreach(f) |
Applies the f function to all Row of this DataFrame. |
foreachPartition(f) |
Applies the f function to each partition of this DataFrame. |
Caching and persistence
| Method |
Description |
cache() |
Persists the DataFrame with the default storage level (MEMORY_AND_DISK_DESER). |
persist(storageLevel) |
Sets the storage level to persist the contents of the DataFrame across operations. |
unpersist(blocking) |
Marks the DataFrame as non-persistent, and remove all blocks for it from memory and disk. |
Checkpointing
Streaming operations
Optimization hints
Limits and offsets
| Method |
Description |
limit(num) |
Limits the result count to the number specified. |
offset(num) |
Returns a new DataFrame by skipping the first n rows. |
Conversion methods
Writing data
| Method |
Description |
writeTo(table) |
Create a write configuration builder for v2 sources. |
mergeInto(table, condition) |
Merges a set of updates, insertions, and deletions based on a source table into a target table. |
DataFrame comparison
| Method |
Description |
sameSemantics(other) |
Returns True when the logical query plans inside both DataFrames are equal. |
semanticHash() |
Returns a hash code of the logical query plan against this DataFrame. |
| Method |
Description |
inputFiles() |
Returns a best-effort snapshot of the files that compose this DataFrame. |
Advanced SQL features
| Method |
Description |
isLocal() |
Returns True if the collect and take methods can be run locally. |
asTable() |
Converts the DataFrame into a TableArg object, which can be used as a table argument in a TVF. |
scalar() |
Return a Column object for a SCALAR Subquery containing exactly one row and one column. |
exists() |
Return a Column object for an EXISTS Subquery. |
Examples
Basic DataFrame operations
# Create a DataFrame
people = spark.createDataFrame([
{"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
{"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
{"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
{"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])
# Select columns
people.select("name", "age").show()
# Filter rows
people.filter(people.age > 30).show()
# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()
Aggregation and grouping
# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()
# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
F.avg("salary").alias("avg_salary"),
F.max("age").alias("max_age")
).show()
Joins
# Create another DataFrame
department = spark.createDataFrame([
{"id": 1, "name": "PySpark"},
{"id": 2, "name": "ML"},
{"id": 3, "name": "Spark SQL"}
])
# Join DataFrames
people.join(department, people.deptId == department.id).show()
# Chained operations
result = people.filter(people.age > 30) \\
.join(department, people.deptId == department.id) \\
.groupBy(department.name, "gender") \\
.agg({"salary": "avg", "age": "max"}) \\
.sort("max(age)")
result.show()