AnomalyDetection_ChangePoint (Azure Stream Analytics)

Erkennt persistente Anomalien in einem Zeitreihenereignisstream. Das zugrunde liegende Machine Learning-Modell verwendet den Martingales-Algorithmus für Austauschbarkeit.

Syntax

AnomalyDetection_ChangePoint(
        <scalar_expression>, 
        <confidence>, 
        <historySize>)
    OVER ([PARTITION BY <partition key>] 
        LIMIT DURATION(<unit>, <length>)
    [WHEN boolean_expression])

Argumente

scalar_expression

Die Ereignisspalte oder das berechnete Feld, über das das Modell Anomalieerkennung durchführt. Zulässige Werte für diesen Parameter sind FLOAT- oder BIGINT-Datentypen, die einen einzelnen (skalaren) Wert zurückgeben.

Der Platzhalterausdruck * ist nicht zulässig. Außerdem können scalar_expression keine anderen Analysefunktionen oder externe Funktionen enthalten.

Vertrauen

Eine Prozentzahl von 1,00 bis 100 (einschließlich), die die Vertraulichkeit des Machine Learning-Modells festlegt. Je geringer die Zuverlässigkeit, desto höher ist die Anzahl der erkannten Anomalien und umgekehrt. Beginnen Sie mit einer beliebigen Zahl zwischen 70 und 90, und passen Sie diese basierend auf den bei der Entwicklung oder im Test beobachteten Ergebnissen an.

historySize

Die Anzahl der Ereignisse in einem gleitenden Fenster, die das Modell kontinuierlich lernt und zum Bewerten des nächsten Ereignisses auf Anomalie verwendet. In der Regel sollte dies den Zeitraum des normalen Verhaltens darstellen, damit das Modell eine nachfolgende Anomalie kennzeichnen kann. Beginnen Sie mit einer fundierten Schätzung mithilfe von Verlaufsprotokollen, und passen Sie sie basierend auf den bei der Entwicklung oder im Test beobachteten Ergebnissen an.

OVER ([ partition_by_clause ] limit_duration_clause [when_clause])

partition_by_clause

Wird verwendet, um das Training eines Modells basierend auf einer bestimmten Spalte in den Ereignissen zu partitionieren. Das Modell wendet dieselben Funktionsparametereinstellungen auf alle Partitionen an.

limit_duration_clause DURATION(Einheit, Länge)

Die Größe des gleitenden Fensters in Stream Analytics in Bezug auf die Zeit. Die empfohlene Größe dieses Zeitfensters entspricht der Zeit, die zum Generieren von HistorySize number of events in steady state benötigt wird.

when_clause

Gibt die boolesche Bedingung für die Ereignisse an, die dem Modell zur Durchführung der Anomalieerkennung bereitgestellt werden sollen. Die when_clause ist optional.

Rückgabetypen

Die Funktion gibt einen geschachtelten Datensatz aus den folgenden Spalten zurück:

IsAnomaly

Ein BIGINT (0 oder 1), der angibt, ob das Ereignis anomale Ereignisse war oder nicht.

Wert

Die berechnete Martingale-Bewertung (float) gibt an, wie anomale Ereignisse sind. Diese Bewertung wächst exponentiell mit anomalen Werten.

Beispiele

Im folgenden Abfragebeispiel geht die erste Abfrage alle fünf Minuten von einem Ereignis aus, während die zweite Abfrage jede Sekunde von einem Ereignis ausgeht. Das Konfidenzniveau ist für beide Modelle auf 75 festgelegt.

AnomalyDetection_ChangePoint(reading, 75, 72)
    OVER (LIMIT DURATION(hour, 6))

AnomalyDetection_ChangePoint(temperature, 75, 120)
    OVER ([PARTITION BY sensorId] LIMIT DURATION(second, 120))

Beispiel: Annahme einer einheitlichen Eingaberate von 1 Ereignis pro Sekunde in einem gleitenden Fenster von 20 Minuten mit einer Verlaufsgröße von 1200 Ereignissen. Mit einem Zuverlässigkeitsgrad von 80% extrahiert die letzte SELECT-Anweisung Ergebnis und Anomalienstatus und gibt sie aus.

WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Beispiel für einen nicht gleichmäßigen Eingabestrom, der mithilfe eines taumelnden Fensters von 1 Sekunde einheitlich gestaltet wird:

WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_ChangePoint(temp, 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM SmootheningStep
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Beispiel mit einer partitionierten Abfrage zum Trainieren eines separaten Modells pro Sensor:

WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(PARTITION BY sensorid
LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
CAST (sensorid as nvarchar(max)) as sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep