用于在 DataFrames 中定义窗口的实用工具函数。
支持 Spark Connect
类特性
| Attribute | 说明 |
|---|---|
unboundedPreceding |
表示未绑定窗口框架的开始的边界值。 |
unboundedFollowing |
表示未绑定窗口框架末尾的边界值。 |
currentRow |
表示窗口框架中当前行的边界值。 |
方法
| 方法 | 说明 |
|---|---|
orderBy(*cols) |
使用定义的排序创建 WindowSpec。 |
partitionBy(*cols) |
使用定义的分区创建 WindowSpec。 |
rangeBetween(start, end) |
使用当前行ORDER BY值的基于范围的偏移量,使用定义的框架边界(start非独占)到end(非独占)创建 WindowSpec。 |
rowsBetween(start, end) |
使用基于行的偏移量创建一个 WindowSpec,其定义的框架边界从 start (非独占)到 end (非独占)。 |
备注
如果未定义排序,则默认使用未绑定的窗口框架(rowFrame、unboundedPreceding、unboundedFollowing)。 定义排序时,默认使用不断增长的窗口框架(rangeFrame、unboundedPreceding、currentRow)。
示例
具有排序和行框的基本窗口
from pyspark.sql import Window
# ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
具有范围框架的分区窗口
from pyspark.sql import Window
# PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)
分区中的行号
from pyspark.sql import Window, functions as sf
df = spark.createDataFrame(
[(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)
# Show row number ordered by id within each category partition
window = Window.partitionBy("category").orderBy("id")
df.withColumn("row_number", sf.row_number().over(window)).show()
使用基于行的帧运行总和
from pyspark.sql import Window, functions as sf
df = spark.createDataFrame(
[(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)
# Sum id values from the current row to the next row within each partition
window = Window.partitionBy("category").orderBy("id").rowsBetween(Window.currentRow, 1)
df.withColumn("sum", sf.sum("id").over(window)).sort("id", "category", "sum").show()
使用基于范围的帧运行总和
from pyspark.sql import Window, functions as sf
df = spark.createDataFrame(
[(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)
# Sum id values from the current id value to id + 1 within each partition
window = Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
df.withColumn("sum", sf.sum("id").over(window)).sort("id", "category").show()