Bagikan melalui


Lag

Fungsi jendela: mengembalikan nilai yang merupakan offset baris sebelum baris saat ini, dan default jika ada kurang dari offset baris sebelum baris saat ini. Misalnya, salah offset satu akan mengembalikan baris sebelumnya pada titik tertentu di partisi jendela.

Ini setara dengan fungsi LAG di SQL.

Syntax

from pyspark.sql import functions as sf

sf.lag(col, offset=1, default=None)

Parameter-parameternya

Pengaturan Tipe Description
col pyspark.sql.Column atau nama kolom Nama kolom atau ekspresi.
offset int, opsional Jumlah baris yang akan diperluas. Pengaturan awal adalah 1.
default fakultatif Nilai bawaan.

Pengembalian Barang

pyspark.sql.Column: nilai sebelum baris saat ini berdasarkan offset.

Examples

Contoh 1: Menggunakan lag untuk mendapatkan nilai sebelumnya

from pyspark.sql import functions as sf
from pyspark.sql import Window
df = spark.createDataFrame(
    [("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"])
df.show()
+---+---+
| c1| c2|
+---+---+
|  a|  1|
|  a|  2|
|  a|  3|
|  b|  8|
|  b|  2|
+---+---+
w = Window.partitionBy("c1").orderBy("c2")
df.withColumn("previous_value", sf.lag("c2").over(w)).show()
+---+---+--------------+
| c1| c2|previous_value|
+---+---+--------------+
|  a|  1|          NULL|
|  a|  2|             1|
|  a|  3|             2|
|  b|  2|          NULL|
|  b|  8|             2|
+---+---+--------------+

Contoh 2: Menggunakan lag dengan nilai default

from pyspark.sql import functions as sf
from pyspark.sql import Window
df = spark.createDataFrame(
    [("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"])
w = Window.partitionBy("c1").orderBy("c2")
df.withColumn("previous_value", sf.lag("c2", 1, 0).over(w)).show()
+---+---+--------------+
| c1| c2|previous_value|
+---+---+--------------+
|  a|  1|             0|
|  a|  2|             1|
|  a|  3|             2|
|  b|  2|             0|
|  b|  8|             2|
+---+---+--------------+

Contoh 3: Menggunakan lag dengan offset 2

from pyspark.sql import functions as sf
from pyspark.sql import Window
df = spark.createDataFrame(
    [("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"])
w = Window.partitionBy("c1").orderBy("c2")
df.withColumn("previous_value", sf.lag("c2", 2, -1).over(w)).show()
+---+---+--------------+
| c1| c2|previous_value|
+---+---+--------------+
|  a|  1|            -1|
|  a|  2|            -1|
|  a|  3|             1|
|  b|  2|            -1|
|  b|  8|            -1|
+---+---+--------------+