Compartilhar via


session_window

Gera uma janela de sessão a partir de uma coluna que especifica o carimbo de data/hora.

A janela de sessão é uma das janelas dinâmicas, o que significa que o comprimento da janela está variando de acordo com as entradas fornecidas. O comprimento da janela de sessão é definido como "o carimbo de data/hora da entrada mais recente da sessão + duração da lacuna", portanto, quando as novas entradas são associadas à janela de sessão atual, a janela de hora de término da sessão pode ser expandida de acordo com as novas entradas.

O Windows pode dar suporte à precisão de microssegundos. Não há suporte para o Windows na ordem dos meses.

Para uma consulta de streaming, você pode usar a função current_timestamp para gerar janelas no tempo de processamento. gapDuration é fornecido como cadeias de caracteres, por exemplo, '1 segundo', '1 dia 12 horas', '2 minutos'. As cadeias de caracteres de intervalo válidas são 'week', 'day', 'hour', 'minute', 'second', 'milissegundo', 'microssegundo'.

Também pode ser uma Coluna que pode ser avaliada para a duração da lacuna dinamicamente com base na linha de entrada.

A coluna de saída será um struct chamado 'session_window' por padrão com as colunas aninhadas 'start' e 'end', em que 'start' e 'end' serão de pyspark.sql.types.TimestampType.

Para a função SQL do Databricks correspondente, consulte session_window a expressão de agrupamento.

Sintaxe

from pyspark.databricks.sql import functions as dbf

dbf.session_window(timeColumn=<timeColumn>, gapDuration=<gapDuration>)

Parâmetros

Parâmetro Tipo Description
timeColumn pyspark.sql.Column ou str O nome da coluna ou coluna a ser usado como o carimbo de data/hora para janelas por tempo. A coluna de tempo deve ser de TimestampType ou TimestampNTZType.
gapDuration pyspark.sql.Column ou literal string Uma coluna ou literal de cadeia de caracteres python que especifica o tempo limite da sessão. Pode ser um valor estático, por exemplo 10 minutes, 1 secondou uma expressão/UDF que especifica a duração da lacuna dinamicamente com base na linha de entrada.

Devoluções

pyspark.sql.Column: a coluna para resultados computados.

Exemplos

from pyspark.databricks.sql import functions as dbf
df = spark.createDataFrame([('2016-03-11 09:00:07', 1)], ['dt', 'v'])
df2 = df.groupBy(dbf.session_window('dt', '5 seconds')).agg(dbf.sum('v'))
df2.show(truncate=False)
df2.printSchema()