延隔时间(Azure 流分析)

LAG 分析运算符允许在特定约束内查找事件流中的“上一个”事件。 这对于计算变量的增长率、检测变量何时超过阈值或条件开始或停止为 true 非常有用。

在流分析中,LAG 的范围 (即,它需要查看的当前事件的历史距离) 始终使用 LIMIT DURATION 子句限制为有限的时间间隔。 可以选择性地将 LAG 限制为仅考虑使用 PARTITION BY 和 WHEN 子句的特定属性或条件上与当前事件匹配的事件。

LAG 不受 WHERE 子句中的谓词、JOIN 子句中的联接条件或当前查询的 GROUP BY 子句中的分组表达式的影响,因为先计算这些子句。

语法

LAG(<scalar_expression >, [<offset >], [<default>])  
     OVER ([PARTITION BY <partition key>] LIMIT DURATION(<unit>, <length>) [WHEN boolean_expression])
  

例如:

LAG(reading) OVER (LIMIT DURATION(hour, 3))  
LAG(name, 2, 'none such') OVER (PARTITION BY userId LIMIT DURATION(minute, 2))  

参数

scalar_expression

要根据指定偏移量返回的值。 它可以是返回单个(标量)值的任何类型的表达式或通配符表达式“*”。 对于“*”,将返回指定偏移量的整个事件,并将包含在结果事件中, (嵌套记录) 。
scalar_expression 不能包含其他分析函数或外部函数。

offset

从要从中获取值的当前事件后退的事件数。 如果未指定,则默认值为 1,这意味着它将返回上一个事件。 偏移量必须是大于或等于 1 的整数。 按时间顺序处理事件。 如果有多个事件具有相同的时间戳,则按到达的顺序处理这些事件。

default

指定的偏移量处没有事件时要返回的值。 如果未指定默认值,则返回 NULL。 如果到目前为止看到的对应事件数小于指定偏移量,则“指定偏移量无事件”可以是 1) ,如果指定偏移量的事件根据指定的limit_duration_clause 3 个事件超时,则为 2) ) 事件存在,但与when_clause中指定的布尔条件不匹配。

如果指定偏移量处的事件存在,并且 scalar_expression 的值为 NULL,则为 NULL
返回。 default 可以是列、子查询或其他表达式,但它不能包含其他表达式
分析函数或外部函数。 default 的类型必须与
scalar_expression。

OVER ( [ partition_by_clause ] limit_duration_clause [when_clause])

partition_by_clause PARTITION BY <partition key> 子句仅请求其值为 的事件
<分区键与要考虑的当前事件的分区键> 相同。 例如,

LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))  

如果在前 1 小时内) 发生,则返回与当前事件 (相同的传感器的先前读数。

limit_duration 子句 DURATION (<单位>, <长度>)

指定必须考虑当前事件中的历史记录量。 有关受支持的单位及其缩写的详细说明,请参阅 DATEDIFF。 如果在 DURATION 间隔内找不到足够的匹配事件,则 <返回默认值> 。

when_clause
指定要在 LAG 计算中考虑的事件的布尔条件。 如果在 DURATION 间隔内找不到足够的匹配事件,则 <返回默认值> 。 when_clause是可选的。

返回类型

指定 scalar_expression 的数据类型。 如果 scalar_expression 为以下值,则返回 NULL

一般备注

LAG 具有不确定性。 按时间顺序处理事件。 如果有多个事件具有相同的时间戳,则按到达的顺序处理这些事件。

开窗函数 的结果集应用 LAG 可能会产生意外的结果。 窗口化函数会更改事件的时间戳,因为每个窗口操作都输出窗口末尾的事件。 在窗口操作后,可以使用 system.timestamp () 访问事件的当前时间戳,它将不同于原始事件时间属性。 如果在窗口操作之前无法移动 LAG,请考虑使用 CollectTop,按原始事件时间排序。

示例

计算每个传感器的增长率:

SELECT sensorId,  
       growth = reading -
                        LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))  
FROM input  
  

查找上一个不为 null 的传感器读数:

SELECT  
     sensorId,  
     LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1) WHEN reading IS NOT NULL)  
     FROM input  
  

查找特定传感器类型的上一个非 null 传感器读数:

WITH filterSensor AS
(
  SELECT *
  FROM input
  WHERE input.sensorType = 4 AND sensorId IS NOT NULL
)

SELECT
  LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))
FROM filterSensor

确定变量何时超过阈值:

SELECT
    sensorId, reading
FROM input
WHERE
    devicetype = 'thermostat'
    AND reading > 100
    AND LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1) WHEN devicetype = 'thermostat') <= 100

另请参阅

ISFIRST(Azure 流分析)
上一 (Azure 流分析)