Поделиться через


Отставание

Функция окна: возвращает значение, которое является offset строками до текущей строки, и default если до текущей строки меньше offset строк. Например, один offset из них вернет предыдущую строку в любой точке в разделе окна.

Это эквивалентно функции LAG в SQL.

Синтаксис

from pyspark.sql import functions as sf

sf.lag(col, offset=1, default=None)

Параметры

Параметр Тип Description
col pyspark.sql.Column или имя столбца Имя столбца или выражения.
offset int, необязательный Число расширяемых строк. По умолчанию 1.
default optional Значение по умолчанию.

Возвраты

pyspark.sql.Column: значение до текущей строки на offsetоснове .

Примеры

Пример 1. Использование задержки для получения предыдущего значения

from pyspark.sql import functions as sf
from pyspark.sql import Window
df = spark.createDataFrame(
    [("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"])
df.show()
+---+---+
| c1| c2|
+---+---+
|  a|  1|
|  a|  2|
|  a|  3|
|  b|  8|
|  b|  2|
+---+---+
w = Window.partitionBy("c1").orderBy("c2")
df.withColumn("previous_value", sf.lag("c2").over(w)).show()
+---+---+--------------+
| c1| c2|previous_value|
+---+---+--------------+
|  a|  1|          NULL|
|  a|  2|             1|
|  a|  3|             2|
|  b|  2|          NULL|
|  b|  8|             2|
+---+---+--------------+

Пример 2. Использование задержки со значением по умолчанию

from pyspark.sql import functions as sf
from pyspark.sql import Window
df = spark.createDataFrame(
    [("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"])
w = Window.partitionBy("c1").orderBy("c2")
df.withColumn("previous_value", sf.lag("c2", 1, 0).over(w)).show()
+---+---+--------------+
| c1| c2|previous_value|
+---+---+--------------+
|  a|  1|             0|
|  a|  2|             1|
|  a|  3|             2|
|  b|  2|             0|
|  b|  8|             2|
+---+---+--------------+

Пример 3. Использование задержки со смещением 2

from pyspark.sql import functions as sf
from pyspark.sql import Window
df = spark.createDataFrame(
    [("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"])
w = Window.partitionBy("c1").orderBy("c2")
df.withColumn("previous_value", sf.lag("c2", 2, -1).over(w)).show()
+---+---+--------------+
| c1| c2|previous_value|
+---+---+--------------+
|  a|  1|            -1|
|  a|  2|            -1|
|  a|  3|             1|
|  b|  2|            -1|
|  b|  8|            -1|
+---+---+--------------+