AnomalyDetection_SpikeAndDip (Azure Stream Analytics)

Erkennt temporäre Anomalien in einem Zeitreihenereignis.

Das zugrunde liegende Machine Learning-Modell verwendet den Algorithmus für die adaptive Kerneldichteschätzung.

Syntax

AnomalyDetection_SpikeAndDip(
    <scalar_expression>,
    <confidence>,
    <historySize>,
    <mode>)
OVER ([PARTITION BY <partition key>]
    LIMIT DURATION(<unit>, <length>)
    [WHEN boolean_expression])

Argumente

scalar_expression

Die Ereignisspalte oder das berechnete Feld, über das das Modell Anomalieerkennung durchführt. Zulässige Werte für diesen Parameter sind FLOAT- oder BIGINT-Datentypen, die einen einzelnen (skalaren) Wert zurückgeben.

Der Wildcardausdruck * ist nicht zulässig. Außerdem können scalar_expression keine anderen Analysefunktionen oder externen Funktionen enthalten.

Vertrauen

Eine Prozentzahl zwischen 1,00 und 100 (einschließlich), die die Empfindlichkeit des Machine Learning-Modells festlegt. Je niedriger die Konfidenz, desto höher ist die Anzahl der erkannten Anomalien und umgekehrt. Beginnen Sie mit einer beliebigen Zahl zwischen 70 und 90, und passen Sie diese basierend auf den Ergebnissen an, die in der Entwicklung oder im Test beobachtet wurden.

historySize

Die Anzahl der Ereignisse in einem gleitenden Fenster, von dem das Modell kontinuierlich lernt und verwendet, um das nächste Ereignis auf anomale Weise zu bewerten. In der Regel sollte dies den Zeitraum des normalen Verhaltens darstellen, damit das Modell eine nachfolgende Anomalie kennzeichnen kann. Beginnen Sie mit einer fundierten Schätzung mithilfe von Verlaufsprotokollen, und passen Sie sich basierend auf den Ergebnissen an, die in der Entwicklung oder im Test beobachtet wurden.

mode

Ein Zeichenfolgenparameter, dessen Wert "spikes", "dips" oder "spikesanddips" lautet, um nur Spitzen, nur Einbrüche oder beide Spitzen bzw. Dips zu erkennen.

OVER ( [ partition_by_clause ] limit_duration_clause [when_clause])

partition_by_clause

Wird verwendet, um das Training eines Modells basierend auf einer bestimmten Spalte in den Ereignissen zu partitionieren. Das Modell wendet die gleichen Funktionsparametereinstellungen auf alle Partitionen an.

limit_duration_clause DURATION(unit, length)

Die Größe des gleitenden Fensters in Stream Analytics in Bezug auf die Zeit. Die empfohlene Größe dieses Zeitfensters entspricht der Zeit, die zum Generieren von historySize number of events in steady state benötigt wird.

when_clause

Gibt eine boolesche Bedingung für die Ereignisse an, die vom Modell akzeptiert werden sollen, um Anomalieerkennung durchzuführen. Die when_clause ist optional.

Rückgabetypen

Die Funktion gibt einen geschachtelten Datensatz zurück, der aus den folgenden Spalten besteht:

IsAnomaly

Ein BIGINT (0 oder 1), der angibt, ob das Ereignis anormale Ereignisse war oder nicht.

Wert

Der berechnete p-Wert-Score (float), der angibt, wie anomale Ereignisse sind. Niedrigere Bewertungen bedeuten eine geringere Wahrscheinlichkeit, dass das Ereignis Teil derselben Verteilung ist, und daher desto anomaler ist es.

Beispiele

Im folgenden Beispiel wird eine einheitliche Eingaberate von 1 Ereignis pro Sekunde in einem 2-minütigen gleitenden Fenster mit einer Verlaufsgröße von 120 Ereignissen angenommen. Mit einem Zuverlässigkeitsgrad von 95% extrahiert die letzte SELECT-Anweisung Ergebnis und Anomalienstatus und gibt sie aus.

WITH AnomalyDetectionStep AS
(
    SELECT
        EVENTENQUEUEDUTCTIME as time,
        CAST(temperature AS FLOAT) as temp,
        AnomalyDetection_SpikeAndDip(CAST(temperature AS FLOAT), 95, 120, 'spikesanddips')
        OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
    FROM input
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') as FLOAT) AS
    SpikeAndDipScore,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS BIGINT) AS
    IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep

Beispiel mit einem nicht uniformen Eingabedatenstrom, der mithilfe eines rollierenden Fensters von 1 Sekunde einheitlich gestaltet wird:

WITH SmootheningStep AS
(
    SELECT
        System.Timestamp() as time,
        AVG(CAST(temperature as float)) as temp
    FROM input
    GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
    SELECT
    time,
    temp,
    AnomalyDetection_SpikeAndDip(temp, 95, 120, 'spikesanddips') 
        OVER(LIMIT DURATION(second, 120)) as SpikeAndDipScores
    FROM SmootheningStep
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') AS FLOAT) As
    SpikeAndDipScore,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS BIGINT) AS
    IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep

Beispiel mit einer partitionierten Abfrage zum Trainieren eines separaten Modells pro Sensor:

WITH AnomalyDetectionStep AS
(
    SELECT
        sensorid,
        System.Timestamp() AS time,
        CAST(temperature AS FLOAT) AS temp,
        AnomalyDetection_SpikeAndDip(CAST(temperature AS FLOAT), 95, 120, 'spikesanddips')
            OVER(PARTITION BY sensorid LIMIT DURATION(second, 120)) AS SpikeAndDipScores
    FROM input
)
SELECT
    CAST (sensorid AS NVARCHAR(max)) AS sensoridstring,
    time,
    temp,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') as float) AS
    SpikeAndDipScore,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS BIGINT) AS
    IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep