Share via


AnomalyDetection_ChangePoint (Azure Stream Analytics)

Deteta anomalias persistentes num fluxo de eventos de série temporal. O modelo de machine learning subjacente utiliza o algoritmo Exchangeability Martingales.

Sintaxe

AnomalyDetection_ChangePoint(
        <scalar_expression>, 
        <confidence>, 
        <historySize>)
    OVER ([PARTITION BY <partition key>] 
        LIMIT DURATION(<unit>, <length>)
    [WHEN boolean_expression])

Argumentos

scalar_expression

A coluna de eventos ou o campo calculado sobre o qual o modelo executa a deteção de anomalias. Os valores permitidos para este parâmetro incluem tipos de dados FLOAT ou BIGINT que devolvem um único valor (escalar).

A expressão universal * não é permitida. Além disso, scalar_expression não podem conter outras funções analíticas ou funções externas.

confiança

Um número percentual de 1,00 a 100 (inclusive) que define a sensibilidade do modelo de machine learning. Quanto menor for a confiança, maior será o número de anomalias detetadas e vice-versa. Comece a partir de um número arbitrário entre 70 e 90 e ajuste-o com base nos resultados observados no desenvolvimento ou teste.

historySize

O número de eventos numa janela deslizante que o modelo aprende continuamente e utiliza para marcar o próximo evento para anómalo. Normalmente, isto deve representar o período de tempo do comportamento normal para permitir que o modelo sinalize uma anomalia subsequente. Comece com uma estimativa educada através de registos históricos e ajuste-se com base nos resultados observados no desenvolvimento ou teste.

OVER ([ partition_by_clause ] limit_duration_clause [when_clause])

partition_by_clause

Utilizado para particionar a preparação de um modelo com base numa coluna específica nos eventos. O modelo aplica as mesmas definições de parâmetro de função em todas as partições.

limit_duration_clause DURAÇÃO(unidade, comprimento)

O tamanho da janela deslizante no Stream Analytics em termos de tempo. O tamanho recomendado desta janela de tempo é o equivalente ao tempo que demora a gerar históricoSize o número de eventos em estado estável.

when_clause

Especifica a condição booleana para que os eventos sejam fornecidos ao modelo para efetuar a deteção de anomalias. O when_clause é opcional.

Tipos de Devolução

A função devolve um registo aninhado composto pelas seguintes colunas:

IsAnomaly

Um BIGINT (0 ou 1) que indica se o evento foi anómalo ou não.

Pontuação

A pontuação calculada do Martingale (float) que indica o quão anómalo é um evento. Esta classificação cresce exponencialmente com valores anómalos.

Exemplos

No exemplo de consulta seguinte, a primeira consulta assume um evento a cada 5 minutos e a segunda consulta pressupõe um evento a cada segundo. O nível de confiança é definido em 75 para ambos os modelos.

AnomalyDetection_ChangePoint(reading, 75, 72)
    OVER (LIMIT DURATION(hour, 6))

AnomalyDetection_ChangePoint(temperature, 75, 120)
    OVER ([PARTITION BY sensorId] LIMIT DURATION(second, 120))

Exemplo assumindo uma taxa de entrada uniforme de 1 evento por segundo numa janela deslizante de 20 minutos com um tamanho histórico de 1200 eventos. A instrução SELECT final extrai e produz a classificação e o estado da anomalia com um nível de confiança de 80%.

WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Exemplo com um fluxo de entrada não uniforme que é uniforme com uma janela em cascata de 1 segundo:

WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_ChangePoint(temp, 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM SmootheningStep
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Exemplo com uma consulta particionada para preparar um modelo separado por sensor:

WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(PARTITION BY sensorid
LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
CAST (sensorid as nvarchar(max)) as sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep