Megosztás a következőn keresztül:


Anomáliadetektálás_Változáspont

✅ Azure Stream Analytics ✅ Fabric Eventstream

Állandó anomáliákat észlel egy idősoros eseményfolyamban. A mögöttes gépi tanulási modell az Exchangeability Martingales algoritmust használja.

Szemantika

AnomalyDetection_ChangePoint(
        <scalar_expression>, 
        <confidence>, 
        <historySize>)
    OVER ([PARTITION BY <partition key>] 
        LIMIT DURATION(<unit>, <length>)
    [WHEN boolean_expression])

Érvek

scalar_expression

Az eseményoszlop vagy a számított mező, amelyen a modell anomáliadetektálést végez. A paraméter megengedett értékei közé tartoznak a FLOAT vagy BIGINT adattípusok, amelyek egyetlen (skaláris) értéket adnak vissza.

A *helyettesítő karakter nem használható. Emellett scalar_expression nem tartalmazhat más elemzési függvényeket vagy külső függvényeket.

bizalom

Százalékos szám 1,00-tól 100-ra (beleértve) a gépi tanulási modell érzékenységét. Minél alacsonyabb a megbízhatóság, annál nagyobb az észlelt anomáliák száma, és fordítva. 70 és 90 közötti tetszőleges számból induljon ki, és módosítsa ezt a fejlesztés vagy tesztelés során megfigyelt eredmények alapján.

historySize

A tolóablakban lévő események száma, amelyekből a modell folyamatosan tanul, és a következő esemény rendellenességek esetén történő pontozására használható. Ennek általában a normál viselkedés időtartamát kell jelölnie, hogy a modell megjelölhessen egy későbbi rendellenességet. Kezdje képzett becsléssel az előzménynaplók használatával, és módosítsa a fejlesztés vagy tesztelés során megfigyelt eredmények alapján.

OVER ([ partition_by_clause ] limit_duration_clause [when_clause])

partition_by_clause

A modell betanításának particionálására szolgál az események egy adott oszlopa alapján. A modell ugyanazokat a függvényparaméter-beállításokat alkalmazza az összes partícióra.

limit_duration_clause DURATION(egység, hossz)

A Csúsztatási ablak mérete a Stream Analyticsben az idő függvényében. Ennek az időablaknak az ajánlott mérete megegyezik azzal az idővel, amely az előzmények létrehozásához szükséges.

when_clause

Logikai feltételt ad meg a modellnek az anomáliadetektáláshoz szolgáltatandó eseményekhez. A when_clause megadása nem kötelező.

Visszatérési típusok

A függvény egy beágyazott rekordot ad vissza, amely a következő oszlopokból áll:

IsAnomaly

BIGINT (0 vagy 1), amely azt jelzi, hogy az esemény rendellenes volt-e vagy sem.

Pontszám

A kiszámított Martingale-pontszám (lebegőpontos), amely azt jelzi, hogy milyen rendellenes egy esemény. Ez a pontszám exponenciálisan növekszik rendellenes értékekkel.

Példák

A következő lekérdezési mintában az első lekérdezés 5 percenként feltételez egy eseményt, a második lekérdezés pedig másodpercenként egy eseményt. A megbízhatósági szint mindkét modell esetében 75-ös értékre van állítva.

AnomalyDetection_ChangePoint(reading, 75, 72)
    OVER (LIMIT DURATION(hour, 6))

AnomalyDetection_ChangePoint(temperature, 75, 120)
    OVER ([PARTITION BY sensorId] LIMIT DURATION(second, 120))

Példa 1 esemény másodpercenkénti egységes bemeneti sebességére egy 20 perces tolóablakban, amelynek előzménymérete 1200 esemény. A végső SELECT utasítás kinyeri és kiírja a pontszámot és az anomália státuszát 80%bizalmi szinttel.

WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Példa egy nem egységes bemeneti adatfolyamra, amely egy 1 másodperces ugróablakkal egységessé lett:

WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_ChangePoint(temp, 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM SmootheningStep
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Példa egy particionált lekérdezésre, amely érzékelőnként külön modellt tanít be:

WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(PARTITION BY sensorid
LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
CAST (sensorid as nvarchar(max)) as sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep