共用方式為


DataFrame 類別

一個分散式的資料集合,並被分組成命名欄位。

DataFrame 相當於 Spark SQL 中的關聯式資料表,並可透過 SparkSession 中的各種函式建立。

這很重要

DataFrame 不應該直接用建構子來建立。

支援 Spark Connect

屬性

房產 說明
sparkSession 回傳創建此資料框架的 SparkSession
rdd 將內容以 Row 的 RDD 格式回傳(僅限經典模式)。
na 回傳一個 DataFrameNaFunctions ,用於處理遺失值。
stat 回傳統計函數的 DataFrameStatFunctions
write 將非串流資料幀內容儲存到外部儲存的介面。
writeStream 將串流資料幀內容儲存到外部儲存的介面。
schema 回傳此 DataFrame 的結構型態(StructType)。
dtypes 以清單形式回傳所有欄位名稱及其資料型態。
columns 以清單形式取得 DataFrame 中所有欄位的名稱。
storageLevel 取得資料幀目前的儲存等級。
isStreaming 若此資料框架包含一個或多個持續回傳資料的來源,則回傳 True。
executionInfo 查詢執行後會回傳一個 ExecutionInfo 物件。
plot 回傳一個用於繪圖函式的 PySparkPlotAccessor

方法

資料檢視與檢查

方法 說明
toJSON(use_unicode) 將資料幀轉換為字串或資料幀的 RDD。
printSchema(level) 會以樹狀格式列印結構。
explain(extended, mode) 將(邏輯與物理)平面圖印到主控台以便除錯。
show(n, truncate, vertical) 將資料框的前 n 行列印到主控台。
collect() 會以列清單的形式回傳資料框中的所有紀錄。
toLocalIterator(prefetchPartitions) 回傳一個包含此資料框架中所有列的迭代器。
take(num) 會將前幾列的數字列回傳為 Row 的清單。
tail(num) 會將最後幾列的數列回傳為 Row 的清單。
head(n) 返回前 n 列。
first() 將第一列回傳為一列。
count() 回傳此資料框架中的列數。
isEmpty() 檢查 DataFrame 是否為空,並回傳一個布林值。
describe(*cols) 計算數值與字串欄位的基本統計資料。
summary(*statistics) 計算數值與字串欄位的指定統計量。

臨時視圖

方法 說明
createTempView(name) 用這個 DataFrame 建立一個本地的暫存視圖。
createOrReplaceTempView(name) 建立或替換本地的暫存視圖為此資料框架。
createGlobalTempView(name) 用這個資料框架建立一個全域暫時視圖。
createOrReplaceGlobalTempView(name) 使用該名稱建立或替換一個全域暫時視圖。

選擇與預測

方法 說明
select(*cols) 投影一組表達式並回傳新的 DataFrame。
selectExpr(*expr) 投影一組 SQL 表達式並回傳新的 DataFrame。
filter(condition) 利用給定條件篩選列。
where(condition) 過濾器的別名。
drop(*cols) 回傳一個未指定欄位的新資料框。
toDF(*cols) 回傳一個新的 DataFrame,並指定了新的欄位名稱。
withColumn(colName, col) 透過新增欄位或替換同名欄位,回傳新的 DataFrame。
withColumns(*colsMap) 透過新增多欄或替換同名欄位,回傳新的 DataFrame。
withColumnRenamed(existing, new) 透過重新命名現有欄位回傳新的資料框架。
withColumnsRenamed(colsMap) 透過重新命名多欄回傳新的 DataFrame。
withMetadata(columnName, metadata) 透過更新現有欄位並加入元資料,回傳新的 DataFrame。
metadataColumn(colName) 根據邏輯欄位名稱選擇一個元資料欄位,並以欄位回傳。
colRegex(colName) 根據正則表達式指定的欄位名稱選擇欄位,並回傳為欄位。

排序與排序

方法 說明
sort(*cols, **kwargs) 回傳一個依指定欄位排序的新資料框架。
orderBy(*cols, **kwargs) 別名,代表分類。
sortWithinPartitions(*cols, **kwargs) 回傳一個新的 DataFrame,每個分割區依指定欄位排序。

聚合與分組

方法 說明
groupBy(*cols) 依指定欄位將資料框架分組,以便對其進行聚合。
rollup(*cols) 利用指定的欄位為目前的資料框架建立多維彙整。
cube(*cols) 利用指定的欄位為目前的資料框架建立一個多維立方體。
groupingSets(groupingSets, *cols) 利用指定的分組集為目前資料框架建立多維聚合。
agg(*exprs) 在整個資料框架上進行聚合,不使用群組(df.groupBy().agg()的簡稱)。
observe(observation, *exprs) 定義(命名的)指標以在資料框架上觀察。

Joins

方法 說明
join(other, on, how) 與另一個 DataFrame 連接,使用給定的 join 表達式。
crossJoin(other) 回傳笛卡兒積與另一個 DataFrame。
lateralJoin(other, on, how) 側向連接與另一個 DataFrame 進行,使用給定的連接表達式。

設定作業

方法 說明
union(other) 回傳一個新的 DataFrame,包含此 DataFrame 與另一個 DataFrame 中列的聯集。
unionByName(other, allowMissingColumns) 回傳一個新的資料框架,包含該資料框架與另一個資料框架中列的聯集。
intersect(other) 回傳一個新的 DataFrame,該 DataFrame 只包含該 DataFrame 與另一個 DataFrame 的列。
intersectAll(other) 回傳一個新的 DataFrame,包含該 DataFrame 與另一個 DataFrame 的列,同時保留重複的部分。
subtract(other) 回傳一個新的 DataFrame,包含該 DataFrame 中的列,但不包含其他 DataFrame 中的列。
exceptAll(other) 回傳一個新的 DataFrame,包含該 DataFrame 中的列,但不包含其他 DataFrame 中的列,同時保留重複的部分。

Deduplication

方法 說明
distinct() 回傳一個新的 DataFrame,包含該 DataFrame 中不同列的列。
dropDuplicates(subset) 回傳一個新的 DataFrame,移除重複列,且可選擇只考慮特定欄位。
dropDuplicatesWithinWatermark(subset) 回傳一個新的 DataFrame,移除重複列,且可選擇只考慮特定欄位,置於浮水印內。

取樣與分割

方法 說明
sample(withReplacement, fraction, seed) 回傳該資料框架的取樣子集。
sampleBy(col, fractions, seed) 根據每個層的分數,回傳一個無置換的分層樣本。
randomSplit(weights, seed) 隨機將此資料幀與提供的權重分割。

Partitioning

方法 說明
coalesce(numPartitions) 回傳一個新的 DataFrame,裡面的 numPartitions 分割區正好如此。
repartition(numPartitions, *cols) 回傳一個新的 DataFrame,並依給定的分割表達式劃分。
repartitionByRange(numPartitions, *cols) 回傳一個新的 DataFrame,並依給定的分割表達式劃分。
repartitionById(numPartitions, partitionIdCol) 回傳一個新的 DataFrame,並依據給定的 partition ID 表達式進行分割。

重塑

方法 說明
unpivot(ids, values, variableColumnName, valueColumnName) 將資料框架從寬格式解構為長格式。
melt(ids, values, variableColumnName, valueColumnName) Unpivot的別名。
transpose(indexColumn) 將資料框架轉置,使指定索引欄的值成為新的欄位。

遺漏資料處理

方法 說明
dropna(how, thresh, subset) 回傳一個新的 DataFrame,省略 null 或 NaN 值的列。
fillna(value, subset) 回傳一個新的 DataFrame,空值被填入新值。
replace(to_replace, value, subset) 回傳一個新的 DataFrame,將一個值替換為另一個值。

統計函數

方法 說明
approxQuantile(col, probabilities, relativeError) 計算資料框架數值欄位的近似分位數。
corr(col1, col2, method) 計算資料框中兩欄的相關性,作為雙重值。
cov(col1, col2) 計算給定欄位的樣本協方差,並以名稱表示。
crosstab(col1, col2) 計算給定欄位的成對頻率表。
freqItems(cols, support) 經常出現欄位項目,可能有誤判。

結構操作

方法 說明
to(schema) 回傳一個新的資料框架,每一列都會對照到指定的結構。
alias(alias) 回傳一個帶有別名設定的新 DataFrame。

反覆運算

方法 說明
foreach(f) 將 f 函數套用到這個資料框架的所有列。
foreachPartition(f) 對此資料框架的每個分割套用 f 函數。

快取與持久化

方法 說明
cache() 以預設儲存層級(MEMORY_AND_DISK_DESER)持續保存 DataFrame。
persist(storageLevel) 設定儲存層級以在各操作間持久保存資料框的內容。
unpersist(blocking) 將 DataFrame 標記為非持久性,並從記憶體和磁碟中移除所有與其相關區塊。

檢查點處理

方法 說明
checkpoint(eager) 回傳這個資料框架的檢查點版本。
localCheckpoint(eager, storageLevel) 回傳此資料框的本地檢查點版本。

串流業務

方法 說明
withWatermark(eventTime, delayThreshold) 定義了此資料框架的事件時間水印。

優化提示

方法 說明
hint(name, *parameters) 指定目前資料框架的某些提示。

限制與抵消

方法 說明
limit(num) 限制結果計數為指定數量。
offset(num) 跳過前 n 列,回傳新的 DataFrame。

進階轉換

方法 說明
transform(func, *args, **kwargs) 回傳一個新的 DataFrame。 簡潔的語法,用於串接自訂轉換。

轉換方法

方法 說明
toPandas() 回傳此資料幀的內容為 Pandas pandas。DataFrame。
toArrow() 回傳此資料框的內容為 PyArrow pyarrow。桌子。
pandas_api(index_col) 將現有的資料框架轉換成熊貓火花資料框。
mapInPandas(func, schema, barrier, profile) 利用 Python 原生函式映射目前 DataFrame 中的批次迭代器。
mapInArrow(func, schema, barrier, profile) 在目前的資料框架中,使用 Python 原生函式(在 Pyarrow 上執行)映射批次迭代器。RecordBatch。

寫入資料

方法 說明
writeTo(table) 為 v2 原始碼建立一個寫入設定建構器。
mergeInto(table, condition) 將一組基於來源資料表的更新、插入與刪除合併到目標資料表中。

DataFrame 比較

方法 說明
sameSemantics(other) 當兩個資料框架內的邏輯查詢計畫相等時,回傳為真。
semanticHash() 回傳邏輯查詢計畫的雜湊碼,該資料框架對照此資料框架。

元資料與檔案資訊

方法 說明
inputFiles() 回傳組成此資料框架檔案的盡力而為快照。

進階 SQL 功能

方法 說明
isLocal() 若 collect and take 方法可在本地執行,則回傳為 True。
asTable() 將 DataFrame 轉換成 TableArg 物件,可用作 TVF 中的表格參數。
scalar() 回傳一個包含一列一欄的 SCALAR 子查詢欄位物件。
exists() 回傳一個 EXISTS 子查詢的欄位物件。

Examples

基本資料框架操作

# Create a DataFrame
people = spark.createDataFrame([
    {"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
    {"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
    {"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
    {"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])

# Select columns
people.select("name", "age").show()

# Filter rows
people.filter(people.age > 30).show()

# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()

聚合與分組

# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()

# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
    F.avg("salary").alias("avg_salary"),
    F.max("age").alias("max_age")
).show()

Joins

# Create another DataFrame
department = spark.createDataFrame([
    {"id": 1, "name": "PySpark"},
    {"id": 2, "name": "ML"},
    {"id": 3, "name": "Spark SQL"}
])

# Join DataFrames
people.join(department, people.deptId == department.id).show()

複雜的轉換

# Chained operations
result = people.filter(people.age > 30) \\
    .join(department, people.deptId == department.id) \\
    .groupBy(department.name, "gender") \\
    .agg({"salary": "avg", "age": "max"}) \\
    .sort("max(age)")
result.show()