Megosztás:


session_window

Munkamenetablakot hoz létre egy időbélyeget megadva, amely oszlopot ad meg.

A munkamenet-ablak a dinamikus ablakok egyike, ami azt jelenti, hogy az ablak hossza a megadott bemenettől függően változik. A munkamenet-ablak hossza "a munkamenet legújabb bemenetének időbélyege + résidő", így ha az új bemenetek az aktuális munkamenetablakhoz vannak kötve, a munkamenet-ablak befejezési ideje az új bemeneteknek megfelelően bővíthető.

A Windows támogatja a mikroszekundumos pontosságot. A hónapok sorrendjében lévő Windows nem támogatott.

Streamlekérdezés esetén a függvény current_timestamp használatával windowsokat hozhat létre a feldolgozási idő alatt. A gapDuration sztringként van megadva, például "1 másodperc", "1 nap 12 óra", "2 perc". Az érvényes intervallumsztringek a következők: "hét", "nap", "óra", "perc", "második", "ezredmásodperc", "mikroszekundum".

Olyan oszlop is lehet, amely a bemeneti sor alapján dinamikusan értékelhető ki a rések időtartamára.

A kimeneti oszlop alapértelmezés szerint egy "session_window" nevű szerkezet lesz, a beágyazott "start" és "end" oszlopokkal, ahol a "start" és a "end" lesz pyspark.sql.types.TimestampType.

A megfelelő Databricks SQL-függvényhez lásd session_window a csoportosítási kifejezést.

Szemantika

from pyspark.databricks.sql import functions as dbf

dbf.session_window(timeColumn=<timeColumn>, gapDuration=<gapDuration>)

Paraméterek

Paraméter Típus Description
timeColumn pyspark.sql.Column vagy str Az időbélyegként használni kívánt oszlop neve vagy oszlopa. Az időoszlopnak TimestampType vagy TimestampNTZType típusúnak kell lennie.
gapDuration pyspark.sql.Column vagy literal string A munkamenet időtúllépését meghatározó Python-sztringkonstans vagy -oszlop. Lehet statikus érték, például 10 minutesegy 1 secondkifejezés/UDF, amely a bemeneti sor alapján dinamikusan határozza meg a rések időtartamát.

Visszatérítések

pyspark.sql.Column: a számított eredmények oszlopa.

Példák

from pyspark.databricks.sql import functions as dbf
df = spark.createDataFrame([('2016-03-11 09:00:07', 1)], ['dt', 'v'])
df2 = df.groupBy(dbf.session_window('dt', '5 seconds')).agg(dbf.sum('v'))
df2.show(truncate=False)
df2.printSchema()