Dela via


AnomalyDetection_ChangePoint (Azure Stream Analytics)

Identifierar beständiga avvikelser i en tidsseriehändelseström. Den underliggande maskininlärningsmodellen använder algoritmen Exchangeability Martingales.

Syntax

AnomalyDetection_ChangePoint(
        <scalar_expression>, 
        <confidence>, 
        <historySize>)
    OVER ([PARTITION BY <partition key>] 
        LIMIT DURATION(<unit>, <length>)
    [WHEN boolean_expression])

Argument

scalar_expression

Händelsekolumnen eller det beräknade fältet som modellen utför avvikelseidentifiering över. Tillåtna värden för den här parametern inkluderar FLOAT- eller BIGINT-datatyper som returnerar ett enda (skalärt) värde.

Jokertecknets uttryck * tillåts inte. Dessutom kan scalar_expression inte innehålla andra analysfunktioner eller externa funktioner.

Förtroende

Ett procenttal från 1,00 till 100 (inklusive) som anger maskininlärningsmodellens känslighet. Ju lägre konfidens, desto högre antal identifierade avvikelser och vice versa. Börja från ett godtyckligt tal mellan 70 och 90 och justera detta baserat på de resultat som observerats i utveckling eller testning.

historySize

Antalet händelser i ett skjutfönster som modellen kontinuerligt lär sig av och använder för att bedöma nästa händelse för avvikande. Detta bör vanligtvis representera tidsperioden för normalt beteende så att modellen kan flagga en efterföljande avvikelse. Börja med en kvalificerad gissning med hjälp av historiska loggar och justera baserat på de resultat som observerats i utveckling eller test.

ÖVER ([ partition_by_clause ] limit_duration_clause [when_clause])

partition_by_clause

Används för att partitionera en modells träning baserat på en viss kolumn i händelserna. Modellen tillämpar samma funktionsparameterinställningar i alla partitioner.

limit_duration_clause DURATION(enhet, längd)

Storleken på skjutfönstret i Stream Analytics när det gäller tid. Den rekommenderade storleken för det här tidsfönstret motsvarar den tid det tar att generera historikStorlek på antalet händelser i stabilt tillstånd.

when_clause

Anger booleskt villkor för de händelser som ska tillhandahållas till modellen för att utföra avvikelseidentifiering. When_clause är valfritt.

Returtyper

Funktionen returnerar en kapslad post som består av följande kolumner:

IsAnomaly

En BIGINT (0 eller 1) som anger om händelsen var avvikande eller inte.

Poäng

Den beräknade Martingale-poängen (float) som anger hur avvikande en händelse är. Den här poängen växer exponentiellt med avvikande värden.

Exempel

I följande frågeexempel förutsätter den första frågan en händelse var femte minut och den andra frågan förutsätter en händelse varje sekund. Konfidensnivån är inställd på 75 för båda modellerna.

AnomalyDetection_ChangePoint(reading, 75, 72)
    OVER (LIMIT DURATION(hour, 6))

AnomalyDetection_ChangePoint(temperature, 75, 120)
    OVER ([PARTITION BY sensorId] LIMIT DURATION(second, 120))

Exempel som förutsätter en enhetlig indatahastighet på 1 händelse per sekund under ett skjutfönster på 20 minuter med en historikstorlek på 1 200 händelser. Den slutliga SELECT-instruktionen extraherar och matar ut poäng- och avvikelsestatusen med en konfidensnivå på 80 %.

WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Exempel med en icke-enhetlig indataström som är enhetlig med ett rullande fönster på 1 sekund:

WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_ChangePoint(temp, 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM SmootheningStep
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Exempel med en partitionerad fråga för att träna en separat modell per sensor:

WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(PARTITION BY sensorid
LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
CAST (sensorid as nvarchar(max)) as sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep