DataFrames でウィンドウを定義するためのユーティリティ関数。
Spark Connect のサポート
クラス属性
| 特性 | 説明 |
|---|---|
unboundedPreceding |
無制限のウィンドウ フレームの開始位置を表す境界値。 |
unboundedFollowing |
無制限のウィンドウ フレームの末尾を表す境界値。 |
currentRow |
ウィンドウ フレーム内の現在の行を表す境界値。 |
メソッド
| メソッド | 説明 |
|---|---|
orderBy(*cols) |
順序が定義された WindowSpec を作成します。 |
partitionBy(*cols) |
パーティション分割が定義された WindowSpec を作成します。 |
rangeBetween(start, end) |
現在の行のORDER BY値からの範囲ベースのオフセットを使用して、start (包括) からend (包括) に定義されたフレーム境界を持つ WindowSpec を作成します。 |
rowsBetween(start, end) |
現在の行からの行ベースのオフセットを使用して、 start (包括) から end (包括) に定義されたフレーム境界を持つ WindowSpec を作成します。 |
メモ
順序が定義されていない場合、既定では、非連結ウィンドウ フレーム (rowFrame、unboundedPreceding、unboundedFollowing) が使用されます。 順序付けが定義されている場合、拡大するウィンドウ フレーム (rangeFrame、unboundedPreceding、currentRow) が既定で使用されます。
例示
順序付けと行フレームを含む基本ウィンドウ
from pyspark.sql import Window
# ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
範囲フレームを含むパーティション分割されたウィンドウ
from pyspark.sql import Window
# PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)
パーティション内の行番号
from pyspark.sql import Window, functions as sf
df = spark.createDataFrame(
[(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)
# Show row number ordered by id within each category partition
window = Window.partitionBy("category").orderBy("id")
df.withColumn("row_number", sf.row_number().over(window)).show()
行ベースのフレームを使用した合計の実行
from pyspark.sql import Window, functions as sf
df = spark.createDataFrame(
[(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)
# Sum id values from the current row to the next row within each partition
window = Window.partitionBy("category").orderBy("id").rowsBetween(Window.currentRow, 1)
df.withColumn("sum", sf.sum("id").over(window)).sort("id", "category", "sum").show()
範囲ベースのフレームを使用した合計の実行
from pyspark.sql import Window, functions as sf
df = spark.createDataFrame(
[(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)
# Sum id values from the current id value to id + 1 within each partition
window = Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
df.withColumn("sum", sf.sum("id").over(window)).sort("id", "category").show()