根据指定列的时间戳生成会话窗口。
会话窗口是动态窗口之一,这意味着窗口的长度因给定输入而异。 会话窗口的长度定义为“会话的最新输入时间戳 + 间隔持续时间”,因此,当新输入绑定到当前会话窗口时,可以根据新输入扩展会话窗口的结束时间。
Windows 可以支持微秒精度。 不支持按月顺序排列的 Windows。
对于流式处理查询,可以使用函数 current_timestamp 在处理时间生成窗口。
gapDuration 以字符串的形式提供,例如“1 秒”、“1 天 12 小时”、“2 分钟”。 有效的间隔字符串为“week”、“day”、“hour”、“minute”、“second”、“毫秒”、“microsecond”。
它也可以是一个列,可以根据输入行动态评估为间隔持续时间。
默认情况下,输出列将是一个名为“session_window”的结构,其中嵌套列“start”和“end”为“start”和“end”。pyspark.sql.types.TimestampType
有关相应的 Databricks SQL 函数,请参阅 session_window 分组表达式。
Syntax
from pyspark.databricks.sql import functions as dbf
dbf.session_window(timeColumn=<timeColumn>, gapDuration=<gapDuration>)
参数
| 参数 | 类型 | Description |
|---|---|---|
timeColumn |
pyspark.sql.Column 或 str |
要用作按时间窗口的时间戳的列名或列。 时间列必须是 TimestampType 或 TimestampNTZType。 |
gapDuration |
pyspark.sql.Column 或 literal string |
指定会话超时的 Python 字符串文本或列。 它可以是静态值,例如10 minutes1 second,或表达式/UDF,用于根据输入行动态指定间隙持续时间。 |
退货
pyspark.sql.Column:计算结果的列。
例子
from pyspark.databricks.sql import functions as dbf
df = spark.createDataFrame([('2016-03-11 09:00:07', 1)], ['dt', 'v'])
df2 = df.groupBy(dbf.session_window('dt', '5 seconds')).agg(dbf.sum('v'))
df2.show(truncate=False)
df2.printSchema()