一個分散式的資料集合,並被分組成命名欄位。
DataFrame 相當於 Spark SQL 中的關聯式資料表,並可透過 SparkSession 中的各種函式建立。
這很重要
DataFrame 不應該直接用建構子來建立。
支援 Spark Connect
屬性
| 房產 | 說明 |
|---|---|
sparkSession |
回傳創建此資料框架的 SparkSession 。 |
rdd |
將內容以 Row 的 RDD 格式回傳(僅限經典模式)。 |
na |
回傳一個 DataFrameNaFunctions ,用於處理遺失值。 |
stat |
回傳統計函數的 DataFrameStatFunctions 。 |
write |
將非串流資料幀內容儲存到外部儲存的介面。 |
writeStream |
將串流資料幀內容儲存到外部儲存的介面。 |
schema |
回傳此 DataFrame 的結構型態(StructType)。 |
dtypes |
以清單形式回傳所有欄位名稱及其資料型態。 |
columns |
以清單形式取得 DataFrame 中所有欄位的名稱。 |
storageLevel |
取得資料幀目前的儲存等級。 |
isStreaming |
若此資料框架包含一個或多個持續回傳資料的來源,則回傳 True。 |
executionInfo |
查詢執行後會回傳一個 ExecutionInfo 物件。 |
plot |
回傳一個用於繪圖函式的 PySparkPlotAccessor 。 |
方法
資料檢視與檢查
| 方法 | 說明 |
|---|---|
toJSON(use_unicode) |
將資料幀轉換為字串或資料幀的 RDD。 |
printSchema(level) |
會以樹狀格式列印結構。 |
explain(extended, mode) |
將(邏輯與物理)平面圖印到主控台以便除錯。 |
show(n, truncate, vertical) |
將資料框的前 n 行列印到主控台。 |
collect() |
會以列清單的形式回傳資料框中的所有紀錄。 |
toLocalIterator(prefetchPartitions) |
回傳一個包含此資料框架中所有列的迭代器。 |
take(num) |
會將前幾列的數字列回傳為 Row 的清單。 |
tail(num) |
會將最後幾列的數列回傳為 Row 的清單。 |
head(n) |
返回前 n 列。 |
first() |
將第一列回傳為一列。 |
count() |
回傳此資料框架中的列數。 |
isEmpty() |
檢查 DataFrame 是否為空,並回傳一個布林值。 |
describe(*cols) |
計算數值與字串欄位的基本統計資料。 |
summary(*statistics) |
計算數值與字串欄位的指定統計量。 |
臨時視圖
| 方法 | 說明 |
|---|---|
createTempView(name) |
用這個 DataFrame 建立一個本地的暫存視圖。 |
createOrReplaceTempView(name) |
建立或替換本地的暫存視圖為此資料框架。 |
createGlobalTempView(name) |
用這個資料框架建立一個全域暫時視圖。 |
createOrReplaceGlobalTempView(name) |
使用該名稱建立或替換一個全域暫時視圖。 |
選擇與預測
| 方法 | 說明 |
|---|---|
select(*cols) |
投影一組表達式並回傳新的 DataFrame。 |
selectExpr(*expr) |
投影一組 SQL 表達式並回傳新的 DataFrame。 |
filter(condition) |
利用給定條件篩選列。 |
where(condition) |
過濾器的別名。 |
drop(*cols) |
回傳一個未指定欄位的新資料框。 |
toDF(*cols) |
回傳一個新的 DataFrame,並指定了新的欄位名稱。 |
withColumn(colName, col) |
透過新增欄位或替換同名欄位,回傳新的 DataFrame。 |
withColumns(*colsMap) |
透過新增多欄或替換同名欄位,回傳新的 DataFrame。 |
withColumnRenamed(existing, new) |
透過重新命名現有欄位回傳新的資料框架。 |
withColumnsRenamed(colsMap) |
透過重新命名多欄回傳新的 DataFrame。 |
withMetadata(columnName, metadata) |
透過更新現有欄位並加入元資料,回傳新的 DataFrame。 |
metadataColumn(colName) |
根據邏輯欄位名稱選擇一個元資料欄位,並以欄位回傳。 |
colRegex(colName) |
根據正則表達式指定的欄位名稱選擇欄位,並回傳為欄位。 |
排序與排序
| 方法 | 說明 |
|---|---|
sort(*cols, **kwargs) |
回傳一個依指定欄位排序的新資料框架。 |
orderBy(*cols, **kwargs) |
別名,代表分類。 |
sortWithinPartitions(*cols, **kwargs) |
回傳一個新的 DataFrame,每個分割區依指定欄位排序。 |
聚合與分組
| 方法 | 說明 |
|---|---|
groupBy(*cols) |
依指定欄位將資料框架分組,以便對其進行聚合。 |
rollup(*cols) |
利用指定的欄位為目前的資料框架建立多維彙整。 |
cube(*cols) |
利用指定的欄位為目前的資料框架建立一個多維立方體。 |
groupingSets(groupingSets, *cols) |
利用指定的分組集為目前資料框架建立多維聚合。 |
agg(*exprs) |
在整個資料框架上進行聚合,不使用群組(df.groupBy().agg()的簡稱)。 |
observe(observation, *exprs) |
定義(命名的)指標以在資料框架上觀察。 |
Joins
| 方法 | 說明 |
|---|---|
join(other, on, how) |
與另一個 DataFrame 連接,使用給定的 join 表達式。 |
crossJoin(other) |
回傳笛卡兒積與另一個 DataFrame。 |
lateralJoin(other, on, how) |
側向連接與另一個 DataFrame 進行,使用給定的連接表達式。 |
設定作業
| 方法 | 說明 |
|---|---|
union(other) |
回傳一個新的 DataFrame,包含此 DataFrame 與另一個 DataFrame 中列的聯集。 |
unionByName(other, allowMissingColumns) |
回傳一個新的資料框架,包含該資料框架與另一個資料框架中列的聯集。 |
intersect(other) |
回傳一個新的 DataFrame,該 DataFrame 只包含該 DataFrame 與另一個 DataFrame 的列。 |
intersectAll(other) |
回傳一個新的 DataFrame,包含該 DataFrame 與另一個 DataFrame 的列,同時保留重複的部分。 |
subtract(other) |
回傳一個新的 DataFrame,包含該 DataFrame 中的列,但不包含其他 DataFrame 中的列。 |
exceptAll(other) |
回傳一個新的 DataFrame,包含該 DataFrame 中的列,但不包含其他 DataFrame 中的列,同時保留重複的部分。 |
Deduplication
| 方法 | 說明 |
|---|---|
distinct() |
回傳一個新的 DataFrame,包含該 DataFrame 中不同列的列。 |
dropDuplicates(subset) |
回傳一個新的 DataFrame,移除重複列,且可選擇只考慮特定欄位。 |
dropDuplicatesWithinWatermark(subset) |
回傳一個新的 DataFrame,移除重複列,且可選擇只考慮特定欄位,置於浮水印內。 |
取樣與分割
| 方法 | 說明 |
|---|---|
sample(withReplacement, fraction, seed) |
回傳該資料框架的取樣子集。 |
sampleBy(col, fractions, seed) |
根據每個層的分數,回傳一個無置換的分層樣本。 |
randomSplit(weights, seed) |
隨機將此資料幀與提供的權重分割。 |
Partitioning
| 方法 | 說明 |
|---|---|
coalesce(numPartitions) |
回傳一個新的 DataFrame,裡面的 numPartitions 分割區正好如此。 |
repartition(numPartitions, *cols) |
回傳一個新的 DataFrame,並依給定的分割表達式劃分。 |
repartitionByRange(numPartitions, *cols) |
回傳一個新的 DataFrame,並依給定的分割表達式劃分。 |
repartitionById(numPartitions, partitionIdCol) |
回傳一個新的 DataFrame,並依據給定的 partition ID 表達式進行分割。 |
重塑
| 方法 | 說明 |
|---|---|
unpivot(ids, values, variableColumnName, valueColumnName) |
將資料框架從寬格式解構為長格式。 |
melt(ids, values, variableColumnName, valueColumnName) |
Unpivot的別名。 |
transpose(indexColumn) |
將資料框架轉置,使指定索引欄的值成為新的欄位。 |
遺漏資料處理
| 方法 | 說明 |
|---|---|
dropna(how, thresh, subset) |
回傳一個新的 DataFrame,省略 null 或 NaN 值的列。 |
fillna(value, subset) |
回傳一個新的 DataFrame,空值被填入新值。 |
replace(to_replace, value, subset) |
回傳一個新的 DataFrame,將一個值替換為另一個值。 |
統計函數
| 方法 | 說明 |
|---|---|
approxQuantile(col, probabilities, relativeError) |
計算資料框架數值欄位的近似分位數。 |
corr(col1, col2, method) |
計算資料框中兩欄的相關性,作為雙重值。 |
cov(col1, col2) |
計算給定欄位的樣本協方差,並以名稱表示。 |
crosstab(col1, col2) |
計算給定欄位的成對頻率表。 |
freqItems(cols, support) |
經常出現欄位項目,可能有誤判。 |
結構操作
| 方法 | 說明 |
|---|---|
to(schema) |
回傳一個新的資料框架,每一列都會對照到指定的結構。 |
alias(alias) |
回傳一個帶有別名設定的新 DataFrame。 |
反覆運算
| 方法 | 說明 |
|---|---|
foreach(f) |
將 f 函數套用到這個資料框架的所有列。 |
foreachPartition(f) |
對此資料框架的每個分割套用 f 函數。 |
快取與持久化
| 方法 | 說明 |
|---|---|
cache() |
以預設儲存層級(MEMORY_AND_DISK_DESER)持續保存 DataFrame。 |
persist(storageLevel) |
設定儲存層級以在各操作間持久保存資料框的內容。 |
unpersist(blocking) |
將 DataFrame 標記為非持久性,並從記憶體和磁碟中移除所有與其相關區塊。 |
檢查點處理
| 方法 | 說明 |
|---|---|
checkpoint(eager) |
回傳這個資料框架的檢查點版本。 |
localCheckpoint(eager, storageLevel) |
回傳此資料框的本地檢查點版本。 |
串流業務
| 方法 | 說明 |
|---|---|
withWatermark(eventTime, delayThreshold) |
定義了此資料框架的事件時間水印。 |
優化提示
| 方法 | 說明 |
|---|---|
hint(name, *parameters) |
指定目前資料框架的某些提示。 |
限制與抵消
| 方法 | 說明 |
|---|---|
limit(num) |
限制結果計數為指定數量。 |
offset(num) |
跳過前 n 列,回傳新的 DataFrame。 |
進階轉換
| 方法 | 說明 |
|---|---|
transform(func, *args, **kwargs) |
回傳一個新的 DataFrame。 簡潔的語法,用於串接自訂轉換。 |
轉換方法
| 方法 | 說明 |
|---|---|
toPandas() |
回傳此資料幀的內容為 Pandas pandas。DataFrame。 |
toArrow() |
回傳此資料框的內容為 PyArrow pyarrow。桌子。 |
pandas_api(index_col) |
將現有的資料框架轉換成熊貓火花資料框。 |
mapInPandas(func, schema, barrier, profile) |
利用 Python 原生函式映射目前 DataFrame 中的批次迭代器。 |
mapInArrow(func, schema, barrier, profile) |
在目前的資料框架中,使用 Python 原生函式(在 Pyarrow 上執行)映射批次迭代器。RecordBatch。 |
寫入資料
| 方法 | 說明 |
|---|---|
writeTo(table) |
為 v2 原始碼建立一個寫入設定建構器。 |
mergeInto(table, condition) |
將一組基於來源資料表的更新、插入與刪除合併到目標資料表中。 |
DataFrame 比較
| 方法 | 說明 |
|---|---|
sameSemantics(other) |
當兩個資料框架內的邏輯查詢計畫相等時,回傳為真。 |
semanticHash() |
回傳邏輯查詢計畫的雜湊碼,該資料框架對照此資料框架。 |
元資料與檔案資訊
| 方法 | 說明 |
|---|---|
inputFiles() |
回傳組成此資料框架檔案的盡力而為快照。 |
進階 SQL 功能
| 方法 | 說明 |
|---|---|
isLocal() |
若 collect and take 方法可在本地執行,則回傳為 True。 |
asTable() |
將 DataFrame 轉換成 TableArg 物件,可用作 TVF 中的表格參數。 |
scalar() |
回傳一個包含一列一欄的 SCALAR 子查詢欄位物件。 |
exists() |
回傳一個 EXISTS 子查詢的欄位物件。 |
Examples
基本資料框架操作
# Create a DataFrame
people = spark.createDataFrame([
{"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
{"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
{"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
{"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])
# Select columns
people.select("name", "age").show()
# Filter rows
people.filter(people.age > 30).show()
# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()
聚合與分組
# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()
# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
F.avg("salary").alias("avg_salary"),
F.max("age").alias("max_age")
).show()
Joins
# Create another DataFrame
department = spark.createDataFrame([
{"id": 1, "name": "PySpark"},
{"id": 2, "name": "ML"},
{"id": 3, "name": "Spark SQL"}
])
# Join DataFrames
people.join(department, people.deptId == department.id).show()
複雜的轉換
# Chained operations
result = people.filter(people.age > 30) \\
.join(department, people.deptId == department.id) \\
.groupBy(department.name, "gender") \\
.agg({"salary": "avg", "age": "max"}) \\
.sort("max(age)")
result.show()