次の方法で共有


Window クラス

DataFrames でウィンドウを定義するためのユーティリティ関数。

Spark Connect のサポート

クラス属性

特性 説明
unboundedPreceding 無制限のウィンドウ フレームの開始位置を表す境界値。
unboundedFollowing 無制限のウィンドウ フレームの末尾を表す境界値。
currentRow ウィンドウ フレーム内の現在の行を表す境界値。

メソッド

メソッド 説明
orderBy(*cols) 順序が定義された WindowSpec を作成します。
partitionBy(*cols) パーティション分割が定義された WindowSpec を作成します。
rangeBetween(start, end) 現在の行のORDER BY値からの範囲ベースのオフセットを使用して、start (包括) からend (包括) に定義されたフレーム境界を持つ WindowSpec を作成します。
rowsBetween(start, end) 現在の行からの行ベースのオフセットを使用して、 start (包括) から end (包括) に定義されたフレーム境界を持つ WindowSpec を作成します。

メモ

順序が定義されていない場合、既定では、非連結ウィンドウ フレーム (rowFrame、unboundedPreceding、unboundedFollowing) が使用されます。 順序付けが定義されている場合、拡大するウィンドウ フレーム (rangeFrame、unboundedPreceding、currentRow) が既定で使用されます。

例示

順序付けと行フレームを含む基本ウィンドウ

from pyspark.sql import Window

# ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

範囲フレームを含むパーティション分割されたウィンドウ

from pyspark.sql import Window

# PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)

パーティション内の行番号

from pyspark.sql import Window, functions as sf

df = spark.createDataFrame(
    [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)

# Show row number ordered by id within each category partition
window = Window.partitionBy("category").orderBy("id")
df.withColumn("row_number", sf.row_number().over(window)).show()

行ベースのフレームを使用した合計の実行

from pyspark.sql import Window, functions as sf

df = spark.createDataFrame(
    [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)

# Sum id values from the current row to the next row within each partition
window = Window.partitionBy("category").orderBy("id").rowsBetween(Window.currentRow, 1)
df.withColumn("sum", sf.sum("id").over(window)).sort("id", "category", "sum").show()

範囲ベースのフレームを使用した合計の実行

from pyspark.sql import Window, functions as sf

df = spark.createDataFrame(
    [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)

# Sum id values from the current id value to id + 1 within each partition
window = Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
df.withColumn("sum", sf.sum("id").over(window)).sort("id", "category").show()