Delen via


AnomalyDetection_ChangePoint (Azure Stream Analytics)

Detecteert permanente afwijkingen in een gebeurtenisstroom van een tijdreeks. Het onderliggende machine learning-model maakt gebruik van het algoritme Exchangeability Martingales.

Syntaxis

AnomalyDetection_ChangePoint(
        <scalar_expression>, 
        <confidence>, 
        <historySize>)
    OVER ([PARTITION BY <partition key>] 
        LIMIT DURATION(<unit>, <length>)
    [WHEN boolean_expression])

Argumenten

scalar_expression

De gebeurteniskolom of het berekende veld waarop het model anomaliedetectie uitvoert. Toegestane waarden voor deze parameter omvatten FLOAT- of BIGINT-gegevenstypen die één (scalaire) waarde retourneren.

De expressie met jokertekens * is niet toegestaan. Bovendien mogen scalar_expression geen andere analytische functies of externe functies bevatten.

Vertrouwen

Een percentage van 1,00 tot 100 (inclusief) waarmee de gevoeligheid van het machine learning-model wordt ingesteld. Hoe lager de betrouwbaarheid, hoe hoger het aantal gedetecteerde afwijkingen en vice versa. Begin met een willekeurig getal tussen 70 en 90 en pas dit aan op basis van de resultaten die tijdens de ontwikkeling of test zijn waargenomen.

historySize

Het aantal gebeurtenissen in een sliding window waarvan het model continu leert en gebruikt om de volgende gebeurtenis te scoren op afwijkende gebeurtenissen. Normaal gesproken moet dit de periode van normaal gedrag vertegenwoordigen om het model in staat te stellen een volgende anomalie te markeren. Begin met een gefundeerde schatting met behulp van historische logboeken en pas deze aan op basis van de resultaten die tijdens de ontwikkeling of test zijn waargenomen.

OVER ([ partition_by_clause ] limit_duration_clause [when_clause])

partition_by_clause

Wordt gebruikt om de training van een model te partitioneren op basis van een bepaalde kolom in de gebeurtenissen. Het model past dezelfde functieparameterinstellingen toe op alle partities.

limit_duration_clause DUUR(eenheid;lengte)

De grootte van het schuifvenster binnen Stream Analytics in termen van tijd. De aanbevolen grootte van dit tijdvenster is het equivalent van de tijd die nodig is om geschiedenis te genererenGrootte van het aantal gebeurtenissen in een stabiele toestand.

when_clause

Hiermee geeft u een booleaanse voorwaarde op voor de gebeurtenissen die aan het model moeten worden verstrekt om anomaliedetectie uit te voeren. De when_clause is optioneel.

Retourtypen

De functie retourneert een geneste record die bestaat uit de volgende kolommen:

IsAnomaly

Een BIGINT (0 of 1) die aangeeft of de gebeurtenis afwijkend was of niet.

Score

De berekende Martingale-score (float) die aangeeft hoe afwijkend een gebeurtenis is. Deze score groeit exponentieel met afwijkende waarden.

Voorbeelden

In het volgende queryvoorbeeld gaat de eerste query uit van een gebeurtenis om de 5 minuten en de tweede query gaat uit van een gebeurtenis om de seconde. Het betrouwbaarheidsniveau is ingesteld op 75 voor beide modellen.

AnomalyDetection_ChangePoint(reading, 75, 72)
    OVER (LIMIT DURATION(hour, 6))

AnomalyDetection_ChangePoint(temperature, 75, 120)
    OVER ([PARTITION BY sensorId] LIMIT DURATION(second, 120))

Voorbeeld van een uniforme invoersnelheid van 1 gebeurtenis per seconde in een schuifvenster van 20 minuten met een geschiedenisgrootte van 1200 gebeurtenissen. De laatste SELECT-instructie extraheert en voert de score en anomaliestatus uit met een betrouwbaarheidsniveau van 80%.

WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Voorbeeld met een niet-uniforme invoerstroom die uniform wordt gemaakt met behulp van een tumblingvenster van 1 seconde:

WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_ChangePoint(temp, 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM SmootheningStep
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Voorbeeld met een gepartitioneerde query voor het trainen van een afzonderlijk model per sensor:

WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(PARTITION BY sensorid
LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
CAST (sensorid as nvarchar(max)) as sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep