Udostępnij za pośrednictwem


AnomalyDetection_ChangePoint (Azure Stream Analytics)

Wykrywa trwałe anomalie w strumieniu zdarzeń szeregów czasowych. Podstawowy model uczenia maszynowego używa algorytmu Exchangeability Martingales.

Składnia

AnomalyDetection_ChangePoint(
        <scalar_expression>, 
        <confidence>, 
        <historySize>)
    OVER ([PARTITION BY <partition key>] 
        LIMIT DURATION(<unit>, <length>)
    [WHEN boolean_expression])

Argumenty

Scalar_expression

Kolumna zdarzenia lub obliczone pole, w którym model wykonuje wykrywanie anomalii. Dozwolone wartości tego parametru obejmują typy danych FLOAT lub BIGINT zwracające pojedynczą wartość (skalarną).

Wyrażenie wieloznaczne * jest niedozwolone. Ponadto scalar_expression nie może zawierać innych funkcji analitycznych ani funkcji zewnętrznych.

Zaufania

Liczba procentowa z zakresu od 1,00 do 100 (włącznie), która określa wrażliwość modelu uczenia maszynowego. Im niższa jest pewność, tym większa liczba wykrytych anomalii i odwrotnie. Zacznij od dowolnej liczby z zakresu od 70 do 90 i dostosuj tę wartość w oparciu o wyniki obserwowane podczas programowania lub testowania.

historySize

Liczba zdarzeń w oknie przesuwanym, z którego model stale uczy się i używa go do oceniania następnego zdarzenia w przypadku anomalii. Zazwyczaj powinno to reprezentować okres normalnego zachowania, aby umożliwić modelowi flagę kolejnej anomalii. Zacznij od wykształconego odgadnięcia przy użyciu dzienników historycznych i dostosuj się na podstawie wyników obserwowanych w programowania lub teście.

OVER ([ partition_by_clause ] limit_duration_clause [when_clause])

partition_by_clause

Służy do partycjonowania trenowania modelu na podstawie określonej kolumny w zdarzeniach. Model stosuje te same ustawienia parametrów funkcji we wszystkich partycjach.

limit_duration_clause DURATION(jednostka, długość)

Rozmiar okna przesuwanego w usłudze Stream Analytics pod względem czasu. Zalecany rozmiar tego przedziału czasu jest odpowiednikiem czasu generowania historiiLiczba zdarzeń w stanie stabilnym.

when_clause

Określa warunek logiczny dla zdarzeń, które mają być udostępniane modelowi w celu przeprowadzenia wykrywania anomalii. When_clause jest opcjonalny.

Typy zwracane

Funkcja zwraca zagnieżdżony rekord składający się z następujących kolumn:

IsAnomaly

BIGINT (0 lub 1) wskazujący, czy zdarzenie było nietypowe, czy nie.

Ocena

Obliczony wynik Martingale (float) wskazujący, jak nietypowe jest zdarzenie. Ten wynik rośnie wykładniczo z nietypowymi wartościami.

Przykłady

W poniższym przykładzie zapytania pierwsze zapytanie zakłada zdarzenie co 5 minut, a drugie zapytanie zakłada zdarzenie co sekundę. Poziom ufności jest ustawiony na 75 dla obu modeli.

AnomalyDetection_ChangePoint(reading, 75, 72)
    OVER (LIMIT DURATION(hour, 6))

AnomalyDetection_ChangePoint(temperature, 75, 120)
    OVER ([PARTITION BY sensorId] LIMIT DURATION(second, 120))

Przykład przy założeniu, że jednolita szybkość wprowadzania wynosi 1 zdarzenie na sekundę w 20-minutowym oknie przesuwania z rozmiarem historii wynoszącym 1200 zdarzeń. Końcowa instrukcja SELECT wyodrębnia i generuje wynik oraz stan anomalii z poziomem ufności 80%.

WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Przykład ze strumieniem wejściowym, który jest jednolity przy użyciu okna wirowania 1 sekundy:

WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_ChangePoint(temp, 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM SmootheningStep
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Przykład z partycjonowaną kwerendą w celu wytrenowania oddzielnego modelu na czujnik:

WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(PARTITION BY sensorid
LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
CAST (sensorid as nvarchar(max)) as sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep