LAG (Azure Stream Analytics)

LAG 分析演算子を使用すると、特定の制約内で、イベント ストリーム内の "前" イベントを検索できます。 変数の増加率を計算したり、変数がしきい値を超えた場合、または条件が開始または停止したときに true を検出したりする場合に非常に便利です。

Stream Analytics では、LAG のスコープ (つまり、現在のイベントから見る必要がある履歴までの距離) は、LIMIT DURATION 句を使用して、常に有限の時間間隔に制限されます。 LAG は、必要に応じて、PARTITION BY 句と WHEN 句を使用して、特定のプロパティまたは条件の現在のイベントと一致するイベントのみを考慮するように制限できます。

LAG は、WHERE 句の述語、JOIN 句の結合条件、または現在のクエリの GROUP BY 句のグループ化式の影響を受けません。これらの句の前に評価されるためです。

構文

LAG(<scalar_expression >, [<offset >], [<default>])  
     OVER ([PARTITION BY <partition key>] LIMIT DURATION(<unit>, <length>) [WHEN boolean_expression])
  

たとえば、次のようになります。

LAG(reading) OVER (LIMIT DURATION(hour, 3))  
LAG(name, 2, 'none such') OVER (PARTITION BY userId LIMIT DURATION(minute, 2))  

引数

scalar_expression

指定されたオフセットに基づいて返される値。 単一 (スカラー) 値を返す何らかの種類の式とワイルドカード式「*」のいずれかになります。 '*' の場合、指定したオフセットに従ってイベント全体が返され、結果イベント (入れ子になったレコード) に格納されます。
scalar_expression に他の分析関数または外部関数を含めることはできません。

offset

値を取得する現在のイベントから戻るイベントの数。 指定しない場合、既定値は 1 です。これは、前のイベントを返します。 オフセットは、1 以上の整数である必要があります。 イベントは、時間的な順序で処理されます。 同じタイム スタンプのイベントが複数ある場合、イベントは到着順に処理されます。

default

指定したオフセットでイベントが存在しない場合に返す値。 既定値を指定しない場合、NULL が返されます。 'No event at the specified offset' can be case 1) if the number of corresponding events seen than the specified offset or 2) if the event at the specified offset is timed out according the specified limit_duration_clause 3) events exist but not match boolean condition specified in the when_clause.

指定したオフセットのイベントが存在し、scalar_expressionの値が NULL の場合は NULL になります。
返されます。 default には列、サブクエリ、またはその他の式を指定できますが、他の式を含めることはできません
分析関数または外部関数。 default には、 とまったく同じ型が必要です
scalar_expression。

OVER ( [ partition_by_clause ] limit_duration_clause [when_clause])

partition_by_clause PARTITION BY <パーティション キー> 句は、 の値が のイベントのみを要求します
<パーティション キー> は、現在のイベントが考慮されるのと同じです。 たとえば、

LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))  

は、現在のイベントと同じセンサーの以前の読み取り値を返します (前の 1 時間以内に発生した場合)。

limit_duration 句 DURATION(<unit>, <length>)

現在のイベントの履歴を考慮する必要がある量を指定します。 サポートされている単位とその省略形について詳しくは、DATEDIFF をご覧ください。 DURATION 間隔内に十分な一致イベントが見つからない場合は、 <既定値> が返されます。

when_clause
LAG 計算で考慮されるイベントのブール条件を指定します。 DURATION 間隔内に十分な一致イベントが見つからない場合は、 <既定値> が返されます。 when_clauseは省略可能です。

戻り値の型

指定した scalar_expression のデータ型。 scalar_expression の場合、NULL が返されます。

全般的な解説

LAG は非決定的です。 イベントは、時間的な順序で処理されます。 同じタイム スタンプのイベントが複数ある場合、イベントは到着順に処理されます。

ウィンドウ関数の結果セットに LAG を適用すると、予期しない結果が生成される可能性があります。 ウィンドウ操作では、ウィンドウの最後にイベントが出力されるように、ウィンドウ関数によってイベントのタイムスタンプが変更されます。 イベントの現在のタイムスタンプには system.timestamp()を使用してアクセスできます。ウィンドウ操作後は、元のイベント時刻属性とは異なります。 ウィンドウ操作の前に LAG を移動できない場合は、 CollectTop の使用を検討し、元のイベント時間で並べ替えます。

センサーごとの増加率を計算します。

SELECT sensorId,  
       growth = reading -
                        LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))  
FROM input  
  

以前の not-null センサーの読み取りを検索します。

SELECT  
     sensorId,  
     LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1) WHEN reading IS NOT NULL)  
     FROM input  
  

特定のセンサーの種類について、以前の null 以外のセンサー読み取りを検索します。

WITH filterSensor AS
(
  SELECT *
  FROM input
  WHERE input.sensorType = 4 AND sensorId IS NOT NULL
)

SELECT
  LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))
FROM filterSensor

変数がしきい値を超えたタイミングを決定します。

SELECT
    sensorId, reading
FROM input
WHERE
    devicetype = 'thermostat'
    AND reading > 100
    AND LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1) WHEN devicetype = 'thermostat') <= 100

参照

ISFIRST (Azure Stream Analytics)
LAST (Azure Stream Analytics)