Ескертпе
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Жүйеге кіруді немесе каталогтарды өзгертуді байқап көруге болады.
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Каталогтарды өзгертуді байқап көруге болады.
Azure Stream Analytics поддерживает пользовательские статистические выражения (UDA) на языке JavaScript, что позволяет реализовывать сложную бизнес-логику с отслеживанием состояния. В UDA у вас есть полный контроль над структурой данных состояния, накоплением состояния, деаккумуляции состояния и статистическим вычислением результатов. В этой статье представлены два разных интерфейса пользовательских определяемых функций (UDA) JavaScript, а также инструкции по созданию пользовательской определяемой функции и способы её использования в операциях на основе окон в запросе Stream Analytics.
Пользовательские статистические выражения JavaScript
Пользовательская агрегатная функция используется в соответствии с заданной временной спецификацией для агрегирования событий в этом окне и получения единого результирующего значения. Существуют два типа интерфейсов UDA, поддерживаемых Stream Analytics в настоящее: AccumulateOnly и AccumulateDeaccumulate. Оба типа пользовательских статистических выражений можно использовать для "переворачивающегося", "прыгающего", скользящего окна и окна сеанса. Пользовательский агрегатный оператор AccumulateDeaccumulate работает лучше, чем AccumulateOnly, в сочетании с "Hopping Window", "Sliding Window" и "Session Window". Можно выбрать один из двух типов в зависимости от используемого алгоритма.
Статистические выражения AccumulateOnly
Агрегаты AccumulateOnly могут накапливать только новые события в своем состоянии; их алгоритм не допускает деаккумуляции значений. Выберите этот тип агрегата, если невозможно реализовать вычитание информации о событии из значения состояния. Ниже приведен шаблон JavaScript для агрегатов AccumulateOnly:
// Sample UDA which state can only be accumulated.
function main() {
this.init = function () {
this.state = 0;
}
this.accumulate = function (value, timestamp) {
this.state += value;
}
this.computeResult = function () {
return this.state;
}
}
Статистические выражения AccumulateDeaccumulate
Агрегаты AccumulateDeaccumulate позволяют удалять ранее накопленное значение из состояния, например, удалять пару "ключ-значение" из списка значений события или вычитать значение из состояния агрегата суммы. Ниже приведен шаблон JavaScript для агрегатов AccumulateDeaccumulate.
// Sample UDA which state can be accumulated and deaccumulated.
function main() {
this.init = function () {
this.state = 0;
}
this.accumulate = function (value, timestamp) {
this.state += value;
}
this.deaccumulate = function (value, timestamp) {
this.state -= value;
}
this.deaccumulateState = function (otherState){
this.state -= otherState.state;
}
this.computeResult = function () {
return this.state;
}
}
UDA - объявление функции JavaScript
Каждая пользовательская функция JavaScript определяется объявлением объекта Function. Ниже приведены основные элементы в определении UDA.
Псевдоним функции
Псевдоним функции является идентификатором пользовательского агрегата. При вызове в запросе Stream Analytics всегда используйте псевдоним UDA вместе с префиксом "uda".
Тип функции *
Для пользовательского статистического выражения должен быть указан тип функции Javascript UDA.
Тип выходных данных
Определенный тип, который поддерживает задание Stream Analytics, или "Любой", если вы хотите обработать тип в вашем запросе.
Имя функции
Имя этого объекта функции. Имя функции должно совпадать с псевдонимом UDA.
Метод init()
Метод init() инициализирует состояние агрегата. Этот метод вызывается в начале окна.
Метод accumulate()
Метод accumulate() вычисляет состояние UDA (пользовательского агрегатного выражения) на основе предыдущего состояния и значений текущего события. Этот метод вызывается, когда событие попадает во временное окно (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW или SESSIONWINDOW).
Метод деаккумулирования (deaccumulate)
Метод deaccumulate() повторно определяет состояние на основе значений предыдущего состояния и текущего события. Этот метод вызывается, когда событие покидает окно SLIDINGWINDOW или SESSIONWINDOW.
Метод deaccumulateState()
Метод deaccumulateState() повторно определяет состояние на основе предыдущего состояния и состояния перехода. Этот метод вызывается, когда набор событий покидает окно HOPPINGWINDOW.
Метод computeResult()
Метод computeResult() возвращает агрегированный результат на основе текущего состояния. Этот метод вызывается в конце временного окна (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW или SESSIONWINDOW).
Поддерживаемые типы входных и выходных данных для пользовательских агрегатов JavaScript
Типы данных для агрегатов, определяемых пользователем (UDA) в JavaScript, приведены в разделе Преобразование типов Stream Analytics и JavaScript документа Интеграция пользовательских функций JavaScript.
Добавление созданной пользователем агрегатной функции JavaScript на портале Azure
Ниже рассматривается процесс создания пользовательского атрибута (UDA) через Portal. В примере, который здесь используется, вычисляются средневзвешенные по времени значения.
Теперь создадим UDA JavaScript в существующем задании ASA, следуя следующим шагам.
Выполните вход на портал Azure и найдите ваше существующее задание Stream Analytics.
Щелкните ссылку "Функции" в разделе Топология задания.
Нажмите кнопку Добавить для добавления новой функции.
В представлении "Новая функция" выберите тип функции JavaScript UDA, и вы увидите, что в редакторе отображается шаблон пользовательского определения по умолчанию.
Введите "TWA" в качестве псевдонима UDA и измените реализацию функции следующим образом.
// Sample UDA which calculate Time-Weighted Average of incoming values. function main() { this.init = function () { this.totalValue = 0.0; this.totalWeight = 0.0; } this.accumulate = function (value, timestamp) { this.totalValue += value.level * value.weight; this.totalWeight += value.weight; } // Uncomment below for AccumulateDeaccumulate implementation /* this.deaccumulate = function (value, timestamp) { this.totalValue -= value.level * value.weight; this.totalWeight -= value.weight; } this.deaccumulateState = function (otherState){ this.state -= otherState.state; this.totalValue -= otherState.totalValue; this.totalWeight -= otherState.totalWeight; } */ this.computeResult = function () { if(this.totalValue == 0) { result = 0; } else { result = this.totalValue/this.totalWeight; } return result; } }Когда вы выберете кнопку «Сохранить», ваш пользовательский атрибут (UDA) появится в списке функций.
Выберите новую функцию TWA, и вы сможете проверить ее определение.
Вызов пользовательской агрегатной функции JavaScript в запросе ASA
На портале Azure откройте задание, измените запрос и вызовите функцию TWA() с обязательным префиксом "uda". Например:
WITH value AS
(
SELECT
NoiseLevelDB as level,
DurationSecond as weight
FROM
[YourInputAlias] TIMESTAMP BY EntryTime
)
SELECT
System.Timestamp as ts,
uda.TWA(value) as NoseDoseTWA
FROM value
GROUP BY TumblingWindow(minute, 5)
Тестирование запроса с универсальным определения данных
Создайте локальный JSON-файл с приведенным ниже содержимым, передайте его в задание Stream Analytics и проверьте приведенный выше запрос.
[
{"EntryTime": "2017-06-10T05:01:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 22.0},
{"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 81, "DurationSecond": 37.8},
{"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 85, "DurationSecond": 26.3},
{"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 95, "DurationSecond": 13.7},
{"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 88, "DurationSecond": 10.3},
{"EntryTime": "2017-06-10T05:05:00-07:00", "NoiseLevelDB": 103, "DurationSecond": 5.5},
{"EntryTime": "2017-06-10T05:06:00-07:00", "NoiseLevelDB": 99, "DurationSecond": 23.0},
{"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 1.76},
{"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 79, "DurationSecond": 17.9},
{"EntryTime": "2017-06-10T05:08:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 27.1},
{"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 91, "DurationSecond": 17.1},
{"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 115, "DurationSecond": 7.9},
{"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 28.3},
{"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 55, "DurationSecond": 18.2},
{"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 25.8},
{"EntryTime": "2017-06-10T05:11:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 11.4},
{"EntryTime": "2017-06-10T05:12:00-07:00", "NoiseLevelDB": 89, "DurationSecond": 7.9},
{"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 112, "DurationSecond": 3.7},
{"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 9.7},
{"EntryTime": "2017-06-10T05:18:00-07:00", "NoiseLevelDB": 96, "DurationSecond": 3.7},
{"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 0.99},
{"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 113, "DurationSecond": 25.1},
{"EntryTime": "2017-06-10T05:22:00-07:00", "NoiseLevelDB": 110, "DurationSecond": 5.3}
]
Получить помощь
Для получения дополнительной помощи воспользуйтесь страницей вопросов и ответов о Microsoft Azure Stream Analytics.
Следующие шаги
- Введение в Azure Stream Analytics
- Приступая к работе с Azure Stream Analytics
- Масштабирование заданий в службе Azure Stream Analytics
- Azure Stream Analytics query language reference (Справочник по языку запросов Azure Stream Analytics).
- Azure Stream Analytics management REST API reference (Справочник по API-интерфейсу REST для управления Stream Analytics).