共用方式為


窗戶類

用於定義 DataFrame 視窗的工具函數。

支援 Spark Connect

類別屬性

Attribute 說明
unboundedPreceding 邊界值代表無界視窗框架的起點。
unboundedFollowing 邊界值代表無界窗框的末端。
currentRow 邊界值代表視窗框架中當前的列。

方法

方法 說明
orderBy(*cols) 建立一個定義排序的 WindowSpec。
partitionBy(*cols) 建立一個 WindowSpec,並定義分割區。
rangeBetween(start, end) 建立一個 WindowSpec,並定義框架邊界, start 從(包含)到 end (包含),並利用基於範圍的偏移量從當前列的 ORDER BY 值開始。
rowsBetween(start, end) 建立一個 WindowSpec,並定義框架邊界, start 從(包含)到 end (包含),並利用基於列的偏移量從當前列開始。

Notes

當未定義排序時,預設會使用無界視窗框架(rowFrame、unboundedPreceding、unboundedFollowing)。 當定義排序時,預設會使用成長視窗框架(rangeFrame、unboundedPreceding、currentRow)。

Examples

基本視窗與排序與行框

from pyspark.sql import Window

# ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

帶有範圍框架的分割視窗

from pyspark.sql import Window

# PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)

分割區內的列號

from pyspark.sql import Window, functions as sf

df = spark.createDataFrame(
    [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)

# Show row number ordered by id within each category partition
window = Window.partitionBy("category").orderBy("id")
df.withColumn("row_number", sf.row_number().over(window)).show()

以列為基礎的框架進行運行和

from pyspark.sql import Window, functions as sf

df = spark.createDataFrame(
    [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)

# Sum id values from the current row to the next row within each partition
window = Window.partitionBy("category").orderBy("id").rowsBetween(Window.currentRow, 1)
df.withColumn("sum", sf.sum("id").over(window)).sort("id", "category", "sum").show()

以距離為基礎的框架進行運行和

from pyspark.sql import Window, functions as sf

df = spark.createDataFrame(
    [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)

# Sum id values from the current id value to id + 1 within each partition
window = Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
df.withColumn("sum", sf.sum("id").over(window)).sort("id", "category").show()