Пользовательские статистические выражения JavaScript в Azure Stream Analytics

Azure Stream Analytics поддерживает пользовательские статистические выражения (UDA) на языке JavaScript, что позволяет реализовывать сложную бизнес-логику с отслеживанием состояния. В UDA у вас есть полный контроль над структурой данных состояния, накоплением состояния, деаккумуляции состояния и статистическим вычислением результатов. В этой статье представлены два разных интерфейса пользовательских определяемых функций (UDA) JavaScript, а также инструкции по созданию пользовательской определяемой функции и способы её использования в операциях на основе окон в запросе Stream Analytics.

Пользовательские статистические выражения JavaScript

Пользовательская агрегатная функция используется в соответствии с заданной временной спецификацией для агрегирования событий в этом окне и получения единого результирующего значения. Существуют два типа интерфейсов UDA, поддерживаемых Stream Analytics в настоящее: AccumulateOnly и AccumulateDeaccumulate. Оба типа пользовательских статистических выражений можно использовать для "переворачивающегося", "прыгающего", скользящего окна и окна сеанса. Пользовательский агрегатный оператор AccumulateDeaccumulate работает лучше, чем AccumulateOnly, в сочетании с "Hopping Window", "Sliding Window" и "Session Window". Можно выбрать один из двух типов в зависимости от используемого алгоритма.

Статистические выражения AccumulateOnly

Агрегаты AccumulateOnly могут накапливать только новые события в своем состоянии; их алгоритм не допускает деаккумуляции значений. Выберите этот тип агрегата, если невозможно реализовать вычитание информации о событии из значения состояния. Ниже приведен шаблон JavaScript для агрегатов AccumulateOnly:

// Sample UDA which state can only be accumulated.
function main() {
    this.init = function () {
        this.state = 0;
    }

    this.accumulate = function (value, timestamp) {
        this.state += value;
    }

    this.computeResult = function () {
        return this.state;
    }
}

Статистические выражения AccumulateDeaccumulate

Агрегаты AccumulateDeaccumulate позволяют удалять ранее накопленное значение из состояния, например, удалять пару "ключ-значение" из списка значений события или вычитать значение из состояния агрегата суммы. Ниже приведен шаблон JavaScript для агрегатов AccumulateDeaccumulate.

// Sample UDA which state can be accumulated and deaccumulated.
function main() {
    this.init = function () {
        this.state = 0;
    }

    this.accumulate = function (value, timestamp) {
        this.state += value;
    }

    this.deaccumulate = function (value, timestamp) {
        this.state -= value;
    }

    this.deaccumulateState = function (otherState){
        this.state -= otherState.state;
    }

    this.computeResult = function () {
        return this.state;
    }
}

UDA - объявление функции JavaScript

Каждая пользовательская функция JavaScript определяется объявлением объекта Function. Ниже приведены основные элементы в определении UDA.

Псевдоним функции

Псевдоним функции является идентификатором пользовательского агрегата. При вызове в запросе Stream Analytics всегда используйте псевдоним UDA вместе с префиксом "uda".

Тип функции *

Для пользовательского статистического выражения должен быть указан тип функции Javascript UDA.

Тип выходных данных

Определенный тип, который поддерживает задание Stream Analytics, или "Любой", если вы хотите обработать тип в вашем запросе.

Имя функции

Имя этого объекта функции. Имя функции должно совпадать с псевдонимом UDA.

Метод init()

Метод init() инициализирует состояние агрегата. Этот метод вызывается в начале окна.

Метод accumulate()

Метод accumulate() вычисляет состояние UDA (пользовательского агрегатного выражения) на основе предыдущего состояния и значений текущего события. Этот метод вызывается, когда событие попадает во временное окно (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW или SESSIONWINDOW).

Метод деаккумулирования (deaccumulate)

Метод deaccumulate() повторно определяет состояние на основе значений предыдущего состояния и текущего события. Этот метод вызывается, когда событие покидает окно SLIDINGWINDOW или SESSIONWINDOW.

Метод deaccumulateState()

Метод deaccumulateState() повторно определяет состояние на основе предыдущего состояния и состояния перехода. Этот метод вызывается, когда набор событий покидает окно HOPPINGWINDOW.

Метод computeResult()

Метод computeResult() возвращает агрегированный результат на основе текущего состояния. Этот метод вызывается в конце временного окна (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW или SESSIONWINDOW).

Поддерживаемые типы входных и выходных данных для пользовательских агрегатов JavaScript

Типы данных для агрегатов, определяемых пользователем (UDA) в JavaScript, приведены в разделе Преобразование типов Stream Analytics и JavaScript документа Интеграция пользовательских функций JavaScript.

Добавление созданной пользователем агрегатной функции JavaScript на портале Azure

Ниже рассматривается процесс создания пользовательского атрибута (UDA) через Portal. В примере, который здесь используется, вычисляются средневзвешенные по времени значения.

Теперь создадим UDA JavaScript в существующем задании ASA, следуя следующим шагам.

  1. Выполните вход на портал Azure и найдите ваше существующее задание Stream Analytics.

  2. Щелкните ссылку "Функции" в разделе Топология задания.

  3. Нажмите кнопку Добавить для добавления новой функции.

  4. В представлении "Новая функция" выберите тип функции JavaScript UDA, и вы увидите, что в редакторе отображается шаблон пользовательского определения по умолчанию.

  5. Введите "TWA" в качестве псевдонима UDA и измените реализацию функции следующим образом.

    // Sample UDA which calculate Time-Weighted Average of incoming values.
    function main() {
        this.init = function () {
            this.totalValue = 0.0;
            this.totalWeight = 0.0;
        }
    
        this.accumulate = function (value, timestamp) {
            this.totalValue += value.level * value.weight;
            this.totalWeight += value.weight;
    
        }
    
        // Uncomment below for AccumulateDeaccumulate implementation
        /*
        this.deaccumulate = function (value, timestamp) {
            this.totalValue -= value.level * value.weight;
            this.totalWeight -= value.weight;
        }
    
        this.deaccumulateState = function (otherState){
            this.state -= otherState.state;
            this.totalValue -= otherState.totalValue;
            this.totalWeight -= otherState.totalWeight;
        }
        */
    
        this.computeResult = function () {
            if(this.totalValue == 0) {
                result = 0;
            }
            else {
                result = this.totalValue/this.totalWeight;
            }
            return result;
        }
    }
    
  6. Когда вы выберете кнопку «Сохранить», ваш пользовательский атрибут (UDA) появится в списке функций.

  7. Выберите новую функцию TWA, и вы сможете проверить ее определение.

Вызов пользовательской агрегатной функции JavaScript в запросе ASA

На портале Azure откройте задание, измените запрос и вызовите функцию TWA() с обязательным префиксом "uda". Например:

WITH value AS
(
    SELECT
    NoiseLevelDB as level,
    DurationSecond as weight
FROM
    [YourInputAlias] TIMESTAMP BY EntryTime
)
SELECT
    System.Timestamp as ts,
    uda.TWA(value) as NoseDoseTWA
FROM value
GROUP BY TumblingWindow(minute, 5)

Тестирование запроса с универсальным определения данных

Создайте локальный JSON-файл с приведенным ниже содержимым, передайте его в задание Stream Analytics и проверьте приведенный выше запрос.

[
  {"EntryTime": "2017-06-10T05:01:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 22.0},
  {"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 81, "DurationSecond": 37.8},
  {"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 85, "DurationSecond": 26.3},
  {"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 95, "DurationSecond": 13.7},
  {"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 88, "DurationSecond": 10.3},
  {"EntryTime": "2017-06-10T05:05:00-07:00", "NoiseLevelDB": 103, "DurationSecond": 5.5},
  {"EntryTime": "2017-06-10T05:06:00-07:00", "NoiseLevelDB": 99, "DurationSecond": 23.0},
  {"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 1.76},
  {"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 79, "DurationSecond": 17.9},
  {"EntryTime": "2017-06-10T05:08:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 27.1},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 91, "DurationSecond": 17.1},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 115, "DurationSecond": 7.9},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 28.3},
  {"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 55, "DurationSecond": 18.2},
  {"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 25.8},
  {"EntryTime": "2017-06-10T05:11:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 11.4},
  {"EntryTime": "2017-06-10T05:12:00-07:00", "NoiseLevelDB": 89, "DurationSecond": 7.9},
  {"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 112, "DurationSecond": 3.7},
  {"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 9.7},
  {"EntryTime": "2017-06-10T05:18:00-07:00", "NoiseLevelDB": 96, "DurationSecond": 3.7},
  {"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 0.99},
  {"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 113, "DurationSecond": 25.1},
  {"EntryTime": "2017-06-10T05:22:00-07:00", "NoiseLevelDB": 110, "DurationSecond": 5.3}
]

Получить помощь

Для получения дополнительной помощи воспользуйтесь страницей вопросов и ответов о Microsoft Azure Stream Analytics.

Следующие шаги