Поделиться через


AnomalyDetection_SpikeAndDip (Azure Stream Analytics)

Обнаруживает временные аномалии в событии временных рядов.

Базовая модель машинного обучения использует алгоритм оценки адаптивной плотности ядра.

Синтаксис

AnomalyDetection_SpikeAndDip(
    <scalar_expression>,
    <confidence>,
    <historySize>,
    <mode>)
OVER ([PARTITION BY <partition key>]
    LIMIT DURATION(<unit>, <length>)
    [WHEN boolean_expression])

Аргументы

scalar_expression

Столбец событий или вычисляемое поле, по которому модель выполняет обнаружение аномалий. Допустимые значения для этого параметра включают типы данных FLOAT или BIGINT, возвращающие одно (скалярное) значение.

Выражение с подстановочными знаками * запрещено. Кроме того, scalar_expression не может содержать другие аналитические или внешние функции.

Уверенность

Процентное число от 1,00 до 100 (включительно), которое задает чувствительность модели машинного обучения. Чем ниже достоверность, тем выше число обнаруженных аномалий и наоборот. Начните с произвольного числа от 70 до 90 и измените его на основе результатов, наблюдаемых при разработке или тестировании.

historySize

Количество событий в скользящем окне, которые модель постоянно изучает и использует для оценки следующего события на предмет аномальной активности. Как правило, это должен представлять период времени нормального поведения, чтобы позволить модели пометить последующую аномалию. Начните с обученного предположения, используя журналы журнала, и корректируйте их на основе результатов, наблюдаемых при разработке или тестировании.

mode

Строковый параметр, значение которого равно "пики", "провалы" или "шипы", для обнаружения только пиков, только спадов или обоих пиков и спадов соответственно.

OVER ( [ предложение_partition_by ] предложение_limit_duration [предложение_when])

partition_by_clause

Используется для секционирования обучения модели на основе определенного столбца в событиях. Модель применяет одинаковые параметры параметров функции ко всем секциям.

limit_duration_clause DURATION(единица, длина)

Размер скользящего окна в Stream Analytics с точки зрения времени. Рекомендуемый размер этого временного окна эквивалентен времени, затрачиваемого на создание historySize числа событий в устойчивом состоянии.

when_clause

Задает логическое условие для событий, принимаемых моделью для обнаружения аномалий. When_clause является необязательным.

Типы возвращаемых данных

Функция возвращает вложенную запись, состоящую из следующих столбцов:

IsAnomaly

BigINT (0 или 1), указывающий, было ли событие аномальным или нет.

Оценка

Вычисленная оценка p-значения (float), указывающая, насколько аномальным является событие. Более низкие оценки означают меньшую вероятность того, что событие является частью одного распределения, и, следовательно, тем более аномальным оно является.

Примеры

В следующем примере предполагается единая скорость ввода 1 событие в секунду в скользящем окне 2 минуты с размером журнала 120 событий. Заключительная инструкция SELECT извлекает и выводит оценку и состояние аномалии с уровнем достоверности 95 %.

WITH AnomalyDetectionStep AS
(
    SELECT
        EVENTENQUEUEDUTCTIME as time,
        CAST(temperature AS FLOAT) as temp,
        AnomalyDetection_SpikeAndDip(CAST(temperature AS FLOAT), 95, 120, 'spikesanddips')
        OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
    FROM input
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') as FLOAT) AS
    SpikeAndDipScore,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS BIGINT) AS
    IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep

Пример с неоднородным входным потоком, который становится однородным с помощью переворачивающегося окна в 1 секунду:

WITH SmootheningStep AS
(
    SELECT
        System.Timestamp() as time,
        AVG(CAST(temperature as float)) as temp
    FROM input
    GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
    SELECT
    time,
    temp,
    AnomalyDetection_SpikeAndDip(temp, 95, 120, 'spikesanddips') 
        OVER(LIMIT DURATION(second, 120)) as SpikeAndDipScores
    FROM SmootheningStep
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') AS FLOAT) As
    SpikeAndDipScore,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS BIGINT) AS
    IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep

Пример секционированного запроса для обучения отдельной модели для каждого датчика:

WITH AnomalyDetectionStep AS
(
    SELECT
        sensorid,
        System.Timestamp() AS time,
        CAST(temperature AS FLOAT) AS temp,
        AnomalyDetection_SpikeAndDip(CAST(temperature AS FLOAT), 95, 120, 'spikesanddips')
            OVER(PARTITION BY sensorid LIMIT DURATION(second, 120)) AS SpikeAndDipScores
    FROM input
)
SELECT
    CAST (sensorid AS NVARCHAR(max)) AS sensoridstring,
    time,
    temp,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') as float) AS
    SpikeAndDipScore,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS BIGINT) AS
    IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep