Compartir a través de


AnomalyDetection_ChangePoint (Azure Stream Analytics)

Detecta anomalías persistentes en un flujo de eventos de serie temporal. El modelo de aprendizaje automático subyacente usa el algoritmo Martingales de exchangeability.

Sintaxis

AnomalyDetection_ChangePoint(
        <scalar_expression>, 
        <confidence>, 
        <historySize>)
    OVER ([PARTITION BY <partition key>] 
        LIMIT DURATION(<unit>, <length>)
    [WHEN boolean_expression])

Argumentos

scalar_expression

Columna de evento o campo calculado sobre el que el modelo realiza la detección de anomalías. Los valores permitidos para este parámetro incluyen tipos de datos FLOAT o BIGINT que devuelven un valor único (escalar).

No se permite la expresión comodín * . Además, scalar_expression no pueden contener otras funciones analíticas ni funciones externas.

Confianza

Un número porcentual entre 1,00 y 100 (ambos incluidos) que establece la sensibilidad del modelo de aprendizaje automático. Cuanto menor sea la confianza, mayor será el número de anomalías detectadas y viceversa. Comience desde un número arbitrario entre 70 y 90 y ajuste esto en función de los resultados observados en desarrollo o pruebas.

historySize

Número de eventos en una ventana deslizante de la que el modelo aprende continuamente y usa para puntuar el siguiente evento para la anómala. Normalmente, esto debe representar el período de tiempo de comportamiento normal para permitir que el modelo marque una anomalía posterior. Comience con una estimación educada mediante registros históricos y ajuste en función de los resultados observados en desarrollo o pruebas.

OVER ([ partition_by_clause ] limit_duration_clause [when_clause])

partition_by_clause

Se usa para particionar el entrenamiento de un modelo en función de una columna determinada de los eventos. El modelo aplica la misma configuración de parámetros de función en todas las particiones.

limit_duration_clause DURATION(unit, length)

Tamaño de la ventana deslizante dentro de Stream Analytics en términos de tiempo. El tamaño recomendado de este período de tiempo es el equivalente al tiempo que se tarda en generar historialSize número de eventos en estado estable.

when_clause

Especifica una condición booleana para los eventos que se van a proporcionar al modelo para realizar la detección de anomalías. El when_clause es opcional.

Tipos de valor devuelto

La función devuelve un registro anidado compuesto por las columnas siguientes:

IsAnomaly

BIGINT (0 o 1) que indica si el evento era anómalo o no.

Puntuación

Puntuación de Martingale calculada (float) que indica cómo es un evento anómalo. Esta puntuación crece exponencialmente con valores anómalos.

Ejemplos

En el ejemplo de consulta siguiente, la primera consulta supone un evento cada 5 minutos y la segunda consulta supone un evento cada segundo. El nivel de confianza se establece en 75 para ambos modelos.

AnomalyDetection_ChangePoint(reading, 75, 72)
    OVER (LIMIT DURATION(hour, 6))

AnomalyDetection_ChangePoint(temperature, 75, 120)
    OVER ([PARTITION BY sensorId] LIMIT DURATION(second, 120))

Ejemplo que supone una velocidad de entrada uniforme de 1 evento por segundo en una ventana deslizante de 20 minutos con un tamaño de historial de 1200 eventos. La instrucción SELECT final extrae y produce la puntuación y el estado de anomalía con un nivel de confianza del 80 %.

WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Ejemplo con un flujo de entrada no uniforme que se hace uniforme mediante una ventana de saltos de tamaño constante de 1 segundo:

WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_ChangePoint(temp, 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM SmootheningStep
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Ejemplo con una consulta con particiones para entrenar un modelo independiente por sensor:

WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(PARTITION BY sensorid
LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
CAST (sensorid as nvarchar(max)) as sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep