Kopīgot, izmantojot


lag

Window function: returns the value that is offset rows before the current row, and default if there is less than offset rows before the current row. For example, an offset of one will return the previous row at any given point in the window partition.

This is equivalent to the LAG function in SQL.

Syntax

from pyspark.sql import functions as sf

sf.lag(col, offset=1, default=None)

Parameters

Parameter Type Description
col pyspark.sql.Column or column name Name of column or expression.
offset int, optional Number of row to extend. Default is 1.
default optional Default value.

Returns

pyspark.sql.Column: value before current row based on offset.

Examples

Example 1: Using lag to get previous value

from pyspark.sql import functions as sf
from pyspark.sql import Window
df = spark.createDataFrame(
    [("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"])
df.show()
+---+---+
| c1| c2|
+---+---+
|  a|  1|
|  a|  2|
|  a|  3|
|  b|  8|
|  b|  2|
+---+---+
w = Window.partitionBy("c1").orderBy("c2")
df.withColumn("previous_value", sf.lag("c2").over(w)).show()
+---+---+--------------+
| c1| c2|previous_value|
+---+---+--------------+
|  a|  1|          NULL|
|  a|  2|             1|
|  a|  3|             2|
|  b|  2|          NULL|
|  b|  8|             2|
+---+---+--------------+

Example 2: Using lag with a default value

from pyspark.sql import functions as sf
from pyspark.sql import Window
df = spark.createDataFrame(
    [("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"])
w = Window.partitionBy("c1").orderBy("c2")
df.withColumn("previous_value", sf.lag("c2", 1, 0).over(w)).show()
+---+---+--------------+
| c1| c2|previous_value|
+---+---+--------------+
|  a|  1|             0|
|  a|  2|             1|
|  a|  3|             2|
|  b|  2|             0|
|  b|  8|             2|
+---+---+--------------+

Example 3: Using lag with an offset of 2

from pyspark.sql import functions as sf
from pyspark.sql import Window
df = spark.createDataFrame(
    [("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"])
w = Window.partitionBy("c1").orderBy("c2")
df.withColumn("previous_value", sf.lag("c2", 2, -1).over(w)).show()
+---+---+--------------+
| c1| c2|previous_value|
+---+---+--------------+
|  a|  1|            -1|
|  a|  2|            -1|
|  a|  3|             1|
|  b|  2|            -1|
|  b|  8|            -1|
+---+---+--------------+