Share via


AnomalyDetection_ChangePoint (Azure 串流分析)

偵測時間序列事件資料流程中的持續性異常。 基礎機器學習模型使用 Exchangeability Martingales 演算法。

語法

AnomalyDetection_ChangePoint(
        <scalar_expression>, 
        <confidence>, 
        <historySize>)
    OVER ([PARTITION BY <partition key>] 
        LIMIT DURATION(<unit>, <length>)
    [WHEN boolean_expression])

引數

scalar_expression

模型執行異常偵測的事件資料行或計算欄位。 此參數允許的值包括 FLOAT 或 BIGINT 資料類型,這些資料類型會傳回單一 (純量) 值。

不允許萬用字元運算式 * 。 此外, scalar_expression 不能包含其他分析函式或外部函式。

信心

從 1.00 到 100 (內含) 的百分比數位,可設定機器學習模型的敏感度。 信賴度越低,偵測到的異常數目就越高,反之亦然。 從 70 到 90 之間的任一數字開始,並根據開發或測試中所觀察到的結果來調整此值。

historySize

模型會持續學習的滑動視窗中的事件數目,並用於評分下一個事件是否有異常。 一般而言,這應該代表正常行為的期間,讓模型能夠標幟後續的異常。 從使用歷程記錄進行教育的猜測開始,並根據開發或測試中所觀察到的結果進行調整。

OVER ([ partition_by_clause ] limit_duration_clause [when_clause])

partition_by_clause

用來根據事件中的特定資料行來分割模型的定型。 此模型會將相同的函式參數設定套用至所有分割區。

limit_duration_clause DURATION (單位,長度)

根據時間,串流分析內的滑動視窗大小。 此時間範圍的建議大小相當於產生 歷程記錄 所花費的時間,將事件數目調整為穩定狀態。

when_clause

指定要提供給模型以執行異常偵測之事件的布林條件。 when_clause是選擇性的。

傳回型別

函式會傳回由下列資料行組成的巢狀記錄:

IsAnomaly

BIGINT (0 或 1) 指出事件是否異常。

分數

計算的 Martingale 分數 (浮點數) 指出事件異常程度。 此分數會隨著異常值以指數方式成長。

範例

在下列查詢範例中,第一個查詢會每隔 5 分鐘假設一個事件,而第二個查詢則假設每秒一個事件。 這兩個模型的信賴等級設定為 75。

AnomalyDetection_ChangePoint(reading, 75, 72)
    OVER (LIMIT DURATION(hour, 6))

AnomalyDetection_ChangePoint(temperature, 75, 120)
    OVER ([PARTITION BY sensorId] LIMIT DURATION(second, 120))

假設 20 分鐘滑動視窗中的一致輸入速率為每秒 1 個事件,且歷程記錄大小為 1200 個事件。 最終的 SELECT 陳述式會擷取並輸出 80% 信賴等級的分數和異常狀態。

WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

使用 1 秒的輪轉視窗進行統一的非統一輸入資料流程範例:

WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_ChangePoint(temp, 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM SmootheningStep
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

使用分割查詢來針對每個感應器定型個別模型的範例:

WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(PARTITION BY sensorid
LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
CAST (sensorid as nvarchar(max)) as sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep