通过


Window 类

用于在 DataFrames 中定义窗口的实用工具函数。

支持 Spark Connect

类特性

Attribute 说明
unboundedPreceding 表示未绑定窗口框架的开始的边界值。
unboundedFollowing 表示未绑定窗口框架末尾的边界值。
currentRow 表示窗口框架中当前行的边界值。

方法

方法 说明
orderBy(*cols) 使用定义的排序创建 WindowSpec。
partitionBy(*cols) 使用定义的分区创建 WindowSpec。
rangeBetween(start, end) 使用当前行ORDER BY值的基于范围的偏移量,使用定义的框架边界(start非独占)到end(非独占)创建 WindowSpec。
rowsBetween(start, end) 使用基于行的偏移量创建一个 WindowSpec,其定义的框架边界从 start (非独占)到 end (非独占)。

备注

如果未定义排序,则默认使用未绑定的窗口框架(rowFrame、unboundedPreceding、unboundedFollowing)。 定义排序时,默认使用不断增长的窗口框架(rangeFrame、unboundedPreceding、currentRow)。

示例

具有排序和行框的基本窗口

from pyspark.sql import Window

# ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

具有范围框架的分区窗口

from pyspark.sql import Window

# PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)

分区中的行号

from pyspark.sql import Window, functions as sf

df = spark.createDataFrame(
    [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)

# Show row number ordered by id within each category partition
window = Window.partitionBy("category").orderBy("id")
df.withColumn("row_number", sf.row_number().over(window)).show()

使用基于行的帧运行总和

from pyspark.sql import Window, functions as sf

df = spark.createDataFrame(
    [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)

# Sum id values from the current row to the next row within each partition
window = Window.partitionBy("category").orderBy("id").rowsBetween(Window.currentRow, 1)
df.withColumn("sum", sf.sum("id").over(window)).sort("id", "category", "sum").show()

使用基于范围的帧运行总和

from pyspark.sql import Window, functions as sf

df = spark.createDataFrame(
    [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)

# Sum id values from the current id value to id + 1 within each partition
window = Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
df.withColumn("sum", sf.sum("id").over(window)).sort("id", "category").show()