Поделиться через


session_window

Создает окно сеанса с указанием метки времени, указывающей столбец.

Окно сеанса является одним из динамических окон, что означает, что длина окна зависит от заданных входных данных. Длина окна сеанса определяется как "метка времени последнего ввода сеанса и длительности пробела", поэтому, когда новые входные данные привязаны к текущему окну сеанса, время окончания окна сеанса можно развернуть в соответствии с новыми входными данными.

Windows может поддерживать точность микросекунда. Windows в порядке месяцев не поддерживается.

Для потокового запроса можно использовать функцию current_timestamp для создания окон во время обработки. gapDuration предоставляется в виде строк, например "1 секунда", "1 день 12 часов", "2 минуты". Допустимые строки интервала: "неделя", "день", "час", "минута", "вторая", "миллисекунда", "микросекунды".

Это также может быть столбец, который можно оценить динамически на основе входной строки.

Выходной столбец будет структурой с именем "session_window" по умолчанию с вложенными столбцами "start" и "end", где будет иметь значение pyspark.sql.types.TimestampType"start" и "end".

Для соответствующей функции Databricks SQL см session_window . выражение группировки.

Синтаксис

from pyspark.databricks.sql import functions as dbf

dbf.session_window(timeColumn=<timeColumn>, gapDuration=<gapDuration>)

Параметры

Параметр Тип Description
timeColumn pyspark.sql.Column или str Имя столбца или столбец, используемый в качестве метки времени для окна по времени. Столбец времени должен иметь значение TimestampType или TimestampNTZType.
gapDuration pyspark.sql.Column или literal string Строковый литерал Python или столбец, указывающий время ожидания сеанса. Это может быть статическое значение, например 10 minutes1 second, или выражение/UDF, указывающее длительность пробела динамически на основе входной строки.

Возвраты

pyspark.sql.Column: столбец для вычисляемых результатов.

Примеры

from pyspark.databricks.sql import functions as dbf
df = spark.createDataFrame([('2016-03-11 09:00:07', 1)], ['dt', 'v'])
df2 = df.groupBy(dbf.session_window('dt', '5 seconds')).agg(dbf.sum('v'))
df2.show(truncate=False)
df2.printSchema()