分组到命名列的数据的分布式集合。
DataFrame 等效于 Spark SQL 中的关系表,可以使用 SparkSession 中的各种函数创建。
重要
不应使用构造函数直接创建数据帧。
支持 Spark Connect
属性
| 财产 | 说明 |
|---|---|
sparkSession |
返回创建此数据帧 的 SparkSession 。 |
rdd |
以 RDD 形式返回内容(仅限经典模式)。 |
na |
返回用于处理缺失值的 DataFrameNaFunctions 。 |
stat |
返回统计信息函数 的 DataFrameStatFunctions 。 |
write |
用于将非流式处理数据帧的内容保存到外部存储的接口。 |
writeStream |
用于将流式处理数据帧的内容保存到外部存储的接口。 |
schema |
返回此 DataFrame 的架构作为结构类型。 |
dtypes |
以列表形式返回所有列名及其数据类型。 |
columns |
检索数据帧中所有列的名称作为列表。 |
storageLevel |
获取 DataFrame 的当前存储级别。 |
isStreaming |
如果此数据帧包含一个或多个源,则返回 True,这些源在数据到达时会持续返回数据。 |
executionInfo |
在执行查询后返回 ExecutionInfo 对象。 |
plot |
返回用于绘制函数的 PySparkPlotAccessor 。 |
方法
数据查看和检查
| 方法 | 说明 |
|---|---|
toJSON(use_unicode) |
将数据帧转换为字符串或数据帧的 RDD。 |
printSchema(level) |
以树格式输出架构。 |
explain(extended, mode) |
将(逻辑和物理)计划打印到控制台以进行调试。 |
show(n, truncate, vertical) |
将 DataFrame 的前 n 行输出到控制台。 |
collect() |
以行列表的形式返回 DataFrame 中的所有记录。 |
toLocalIterator(prefetchPartitions) |
返回一个迭代器,其中包含此数据帧中的所有行。 |
take(num) |
以行列表的形式返回前 num 行。 |
tail(num) |
以行列表的形式返回最后一个数字行。 |
head(n) |
返回前 n 行。 |
first() |
以行的形式返回第一行。 |
count() |
返回此数据帧中的行数。 |
isEmpty() |
检查 DataFrame 是否为空并返回布尔值。 |
describe(*cols) |
计算数值列和字符串列的基本统计信息。 |
summary(*statistics) |
计算数值列和字符串列的指定统计信息。 |
临时视图
| 方法 | 说明 |
|---|---|
createTempView(name) |
使用此数据帧创建本地临时视图。 |
createOrReplaceTempView(name) |
使用此数据帧创建或替换本地临时视图。 |
createGlobalTempView(name) |
使用此数据帧创建全局临时视图。 |
createOrReplaceGlobalTempView(name) |
使用给定的名称创建或替换全局临时视图。 |
选择和投影
| 方法 | 说明 |
|---|---|
select(*cols) |
投影一组表达式并返回新的 DataFrame。 |
selectExpr(*expr) |
投影一组 SQL 表达式并返回新的 DataFrame。 |
filter(condition) |
使用给定条件筛选行。 |
where(condition) |
筛选器的别名。 |
drop(*cols) |
返回没有指定列的新 DataFrame。 |
toDF(*cols) |
返回具有新指定列名称的新 DataFrame。 |
withColumn(colName, col) |
通过添加列或替换具有相同名称的现有列来返回新的 DataFrame。 |
withColumns(*colsMap) |
通过添加多个列或替换具有相同名称的现有列来返回新的 DataFrame。 |
withColumnRenamed(existing, new) |
通过重命名现有列返回新的 DataFrame。 |
withColumnsRenamed(colsMap) |
通过重命名多个列返回新的 DataFrame。 |
withMetadata(columnName, metadata) |
通过使用元数据更新现有列来返回新的 DataFrame。 |
metadataColumn(colName) |
基于其逻辑列名称选择元数据列,并将其作为列返回。 |
colRegex(colName) |
根据指定为正则表达式的列名称选择列,并将其作为列返回。 |
排序和排序
| 方法 | 说明 |
|---|---|
sort(*cols, **kwargs) |
返回按指定列排序的新 DataFrame。 |
orderBy(*cols, **kwargs) |
排序的别名。 |
sortWithinPartitions(*cols, **kwargs) |
返回一个新的 DataFrame,每个分区都按指定的列排序。 |
聚合和分组
| 方法 | 说明 |
|---|---|
groupBy(*cols) |
按指定列对数据帧进行分组,以便可以对它们执行聚合。 |
rollup(*cols) |
使用指定的列为当前数据帧创建多维汇总。 |
cube(*cols) |
使用指定的列为当前数据帧创建多维多维多维数据集。 |
groupingSets(groupingSets, *cols) |
使用指定的分组集为当前数据帧创建多维聚合。 |
agg(*exprs) |
对不包含组的整个数据帧进行聚合(df.groupBy()。agg()的速记。 |
observe(observation, *exprs) |
定义要在 DataFrame 上观察的(已命名)指标。 |
联接
| 方法 | 说明 |
|---|---|
join(other, on, how) |
使用给定联接表达式与其他 DataFrame 联接。 |
crossJoin(other) |
返回具有另一个 DataFrame 的笛卡尔积。 |
lateralJoin(other, on, how) |
使用给定联接表达式将横向联接与另一个 DataFrame 联接。 |
设置运算
| 方法 | 说明 |
|---|---|
union(other) |
返回一个新的 DataFrame,其中包含此数据帧和另一个 DataFrame 中的行的并集。 |
unionByName(other, allowMissingColumns) |
返回一个新的数据帧,其中包含此数据帧和另一个 DataFrame 中的行的联合。 |
intersect(other) |
返回一个新的数据帧,该数据帧仅包含此 DataFrame 和另一个 DataFrame 中的行。 |
intersectAll(other) |
返回一个新的 DataFrame,其中包含此 DataFrame 和另一个 DataFrame 中的行,同时保留重复项。 |
subtract(other) |
返回一个新的 DataFrame,其中包含此 DataFrame 中的行,但不在另一个 DataFrame 中。 |
exceptAll(other) |
返回一个新的数据帧,其中包含此数据帧中的行,但不在另一个 DataFrame 中保留重复项。 |
去重
| 方法 | 说明 |
|---|---|
distinct() |
返回一个新的 DataFrame,其中包含此 DataFrame 中的非重复行。 |
dropDuplicates(subset) |
返回删除了重复行的新 DataFrame,可以选择只考虑某些列。 |
dropDuplicatesWithinWatermark(subset) |
返回删除了重复行的新 DataFrame,可以选择只考虑水印中的某些列。 |
采样和拆分
| 方法 | 说明 |
|---|---|
sample(withReplacement, fraction, seed) |
返回此 DataFrame 的采样子集。 |
sampleBy(col, fractions, seed) |
根据每个层中给出的分数返回分层样本,而不用替换。 |
randomSplit(weights, seed) |
使用提供的权重随机拆分此数据帧。 |
Partitioning
| 方法 | 说明 |
|---|---|
coalesce(numPartitions) |
返回一个新的包含 numPartitions 分区的数据帧。 |
repartition(numPartitions, *cols) |
返回由给定分区表达式分区的新 DataFrame。 |
repartitionByRange(numPartitions, *cols) |
返回由给定分区表达式分区的新 DataFrame。 |
repartitionById(numPartitions, partitionIdCol) |
返回由给定分区 ID 表达式分区的新 DataFrame。 |
重塑
| 方法 | 说明 |
|---|---|
unpivot(ids, values, variableColumnName, valueColumnName) |
将数据帧从宽格式撤消为长格式。 |
melt(ids, values, variableColumnName, valueColumnName) |
逆透视的别名。 |
transpose(indexColumn) |
转置数据帧,使指定索引列中的值成为新列。 |
缺少数据处理
| 方法 | 说明 |
|---|---|
dropna(how, thresh, subset) |
返回一个新的数据帧省略具有 null 或 NaN 值的行。 |
fillna(value, subset) |
返回一个新的 DataFrame,其中空值用新值填充。 |
replace(to_replace, value, subset) |
返回一个新的 DataFrame,将值替换为另一个值。 |
统计函数
| 方法 | 说明 |
|---|---|
approxQuantile(col, probabilities, relativeError) |
计算数据帧的数字列的近似分量。 |
corr(col1, col2, method) |
将 DataFrame 的两列的关联计算为双精度值。 |
cov(col1, col2) |
计算由给定列的名称指定的给定列的示例协变。 |
crosstab(col1, col2) |
计算给定列的成对频率表。 |
freqItems(cols, support) |
查找列的常见项,可能带有误报。 |
架构作
| 方法 | 说明 |
|---|---|
to(schema) |
返回一个新的 DataFrame,其中每一行进行协调以匹配指定的架构。 |
alias(alias) |
返回具有别名集的新 DataFrame。 |
迭代
| 方法 | 说明 |
|---|---|
foreach(f) |
将 f 函数应用于此数据帧的所有行。 |
foreachPartition(f) |
将 f 函数应用于此数据帧的每个分区。 |
缓存和持久性
| 方法 | 说明 |
|---|---|
cache() |
保留默认存储级别(MEMORY_AND_DISK_DESER)的数据帧。 |
persist(storageLevel) |
设置存储级别,以跨作保留 DataFrame 的内容。 |
unpersist(blocking) |
将 DataFrame 标记为非持久性,并从内存和磁盘中删除其所有块。 |
检查点
| 方法 | 说明 |
|---|---|
checkpoint(eager) |
返回此 DataFrame 的检查点版本。 |
localCheckpoint(eager, storageLevel) |
返回此 DataFrame 的本地检查点版本。 |
流式处理作
| 方法 | 说明 |
|---|---|
withWatermark(eventTime, delayThreshold) |
定义此数据帧的事件时间水印。 |
优化提示
| 方法 | 说明 |
|---|---|
hint(name, *parameters) |
指定当前 DataFrame 上的一些提示。 |
限制和偏移量
| 方法 | 说明 |
|---|---|
limit(num) |
将结果计数限制为指定的数字。 |
offset(num) |
通过跳过前 n 行返回新的 DataFrame。 |
高级转换
| 方法 | 说明 |
|---|---|
transform(func, *args, **kwargs) |
返回新的 DataFrame。 链接自定义转换的简洁语法。 |
转换方法
| 方法 | 说明 |
|---|---|
toPandas() |
返回此数据帧的内容作为 Pandas pandas。DataFrame。 |
toArrow() |
以 PyArrow pyarrow 形式返回此数据帧的内容。表。 |
pandas_api(index_col) |
将现有数据帧转换为 pandas-on-Spark 数据帧。 |
mapInPandas(func, schema, barrier, profile) |
使用 Python 本机函数映射当前数据帧中的批处理迭代器。 |
mapInArrow(func, schema, barrier, profile) |
使用在 pyarrow 上执行的 Python 本机函数映射当前数据帧中的批处理迭代器。RecordBatch。 |
写入数据
| 方法 | 说明 |
|---|---|
writeTo(table) |
为 v2 源创建写入配置生成器。 |
mergeInto(table, condition) |
基于源表将一组更新、插入和删除合并到目标表中。 |
数据帧比较
| 方法 | 说明 |
|---|---|
sameSemantics(other) |
当两个 DataFrame 内的逻辑查询计划相等时返回 True。 |
semanticHash() |
针对此 DataFrame 返回逻辑查询计划的哈希代码。 |
元数据和文件信息
| 方法 | 说明 |
|---|---|
inputFiles() |
返回构成此数据帧的文件的最佳工作快照。 |
高级 SQL 功能
| 方法 | 说明 |
|---|---|
isLocal() |
如果可在本地运行 collect 和 take 方法,则返回 True。 |
asTable() |
将 DataFrame 转换为 TableArg 对象,该对象可用作 TVF 中的表参数。 |
scalar() |
返回包含一行和一列的 SCALAR 子查询的 Column 对象。 |
exists() |
返回 EXISTS 子查询的 Column 对象。 |
示例
基本数据帧作
# Create a DataFrame
people = spark.createDataFrame([
{"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
{"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
{"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
{"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])
# Select columns
people.select("name", "age").show()
# Filter rows
people.filter(people.age > 30).show()
# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()
聚合和分组
# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()
# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
F.avg("salary").alias("avg_salary"),
F.max("age").alias("max_age")
).show()
联接
# Create another DataFrame
department = spark.createDataFrame([
{"id": 1, "name": "PySpark"},
{"id": 2, "name": "ML"},
{"id": 3, "name": "Spark SQL"}
])
# Join DataFrames
people.join(department, people.deptId == department.id).show()
复杂转换
# Chained operations
result = people.filter(people.age > 30) \\
.join(department, people.deptId == department.id) \\
.groupBy(department.name, "gender") \\
.agg({"salary": "avg", "age": "max"}) \\
.sort("max(age)")
result.show()