Agregados definidos por el usuario en JavaScript para Azure Stream Analytics

Azure Stream Analytics admite agregados definidos por el usuario (UDA) escritos en JavaScript, que le permiten implementar lógica empresarial con estado compleja. Dentro de UDA tiene control completo sobre la estructura de los datos de estado, la acumulación y desacumulación de estado y el cálculo de resultados de agregados. El artículo presenta las dos interfaces de UDA en JavaScript diferentes, los pasos para crear un UDA y cómo usar UDA con operaciones basadas en ventanas en consultas de Stream Analytics.

Agregados definidos por el usuario en JavaScript

Los agregados definidos por el usuario se usan sobre una especificación de ventana de tiempo para agregarlos sobre los eventos de esa ventana y generar un valor con un único resultado. Actualmente hay dos tipos de interfaces de UDA que admite Stream Analytics: AccumulateOnly y AccumulateDeaccumulate. Ambos tipos de UDA se pueden usar con ventanas de saltos de tamaño constante, ventanas de salto, ventanas deslizantes y ventanas de sesión. UDA AccumulateDeaccumulate funciona mejor que UDA AccumulateOnly cuando se usa en combinación con ventanas de salto, ventanas deslizantes y ventanas de sesión. El tipo que elija dependerá del algoritmo que utilice.

Agregados de AccumulateOnly

Los agregados de AccumulateOnly solo pueden acumular nuevos eventos a su estado, el algoritmo no permite la desacumulación de valores. Elija este tipo de agregado cuando sea imposible la desacumulación de la información de un evento de su valor de estado. A continuación se muestra la plantilla de JavaScript para los agregados de AccumulatOnly:

// Sample UDA which state can only be accumulated.
function main() {
    this.init = function () {
        this.state = 0;
    }

    this.accumulate = function (value, timestamp) {
        this.state += value;
    }

    this.computeResult = function () {
        return this.state;
    }
}

Agregados de AccumulateDeaccumulate

Los agregados de AccumulateDeaccumulate permiten la desacumulación de un valor acumulado anterior del estado; por ejemplo, quitar un par clave-valor de una lista de valores de evento o restar un valor de un estado de agregado de suma. A continuación se muestra la plantilla de JavaScript para los agregados de AccumulateDeaccumulate:

// Sample UDA which state can be accumulated and deaccumulated.
function main() {
    this.init = function () {
        this.state = 0;
    }

    this.accumulate = function (value, timestamp) {
        this.state += value;
    }

    this.deaccumulate = function (value, timestamp) {
        this.state -= value;
    }

    this.deaccumulateState = function (otherState){
        this.state -= otherState.state;
    }

    this.computeResult = function () {
        return this.state;
    }
}

UDA: declaración de función de JavaScript

Cada UDA de JavaScript se define mediante una declaración de objeto de función. Estos son los elementos principales de una definición de UDA.

Alias de función

El alias de función es el identificador de UDA. Cuando se llame en consultas de Stream Analytics, use siempre UDA en combinación con un prefijo "uda".

Tipo de función

En UDA, el tipo de función debe ser UDA de JavaScript.

Tipo de salida

Un tipo específico que admite el trabajo de Stream Analytics o "cualquiera" si quiere gestionar el tipo de su consulta.

Nombre de función

El nombre de este objeto de función. El nombre de la función debe coincidir con el alias de UDA.

Método init()

El método init() inicializa el estado del agregado. Este método se llama cuando se inicia la ventana.

Método accumulate()

El método accumulate() calcula el estado de UDA según el estado anterior y los valores de evento actuales. Este método se llama cuando un evento entra en una ventana de tiempo (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW o SESSIONWINDOW).

Método deaccumulate()

El método deaccumulate() vuelve a calcular el estado según el estado anterior y los valores de evento actuales. Este método se llama cuando un evento sale de una ventana SLIDINGWINDOW o SESSIONWINDOW.

Método deaccumulateState()

El método deaccumulateState() vuelve a calcular el estado según el estado anterior y el estado de un salto. Este método se llama cuando un conjunto de eventos sale de una ventana HOPPINGWINDOW.

Método computeResult()

El método computeResult() devuelve resultados de agregados basados en el estado actual. Este método se llama al final de una ventana de tiempo (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW o SESSIONWINDOW).

Tipos de datos de entrada y salida admitidos por UDA de JavaScript

En el caso de tipos de datos de UDA de JavaScript, consulte la sección Conversión de tipos de Stream Analytics y JavaScript de Integración de las UDF de JavaScript.

Adición de un UDA de JavaScript desde Azure Portal

Este artículo le guía por el proceso de creación de un UDA desde el portal. En el ejemplo que se usa aquí se calculan las medias ponderadas de tiempo.

Ahora vamos a crear un UDA de JavaScript en un trabajo de ASA mediante estos pasos.

  1. Inicie sesión en Azure Portal y busque el trabajo de Stream Analytics existente.

  2. A continuación, seleccione el vínculo de funciones en TOPOLOGÍA DE TRABAJO.

  3. Seleccione Agregar para agregar una nueva función.

  4. En la vista Nueva función, seleccione UDA de JavaScript como el tipo de función y verá que aparece una plantilla de UDA predeterminada en el editor.

  5. Como alias de UDA use "TWA" y cambie la implementación de función como se indica a continuación:

    // Sample UDA which calculate Time-Weighted Average of incoming values.
    function main() {
        this.init = function () {
            this.totalValue = 0.0;
            this.totalWeight = 0.0;
        }
    
        this.accumulate = function (value, timestamp) {
            this.totalValue += value.level * value.weight;
            this.totalWeight += value.weight;
    
        }
    
        // Uncomment below for AccumulateDeaccumulate implementation
        /*
        this.deaccumulate = function (value, timestamp) {
            this.totalValue -= value.level * value.weight;
            this.totalWeight -= value.weight;
        }
    
        this.deaccumulateState = function (otherState){
            this.state -= otherState.state;
            this.totalValue -= otherState.totalValue;
            this.totalWeight -= otherState.totalWeight;
        }
        */
    
        this.computeResult = function () {
            if(this.totalValue == 0) {
                result = 0;
            }
            else {
                result = this.totalValue/this.totalWeight;
            }
            return result;
        }
    }
    
  6. Tras seleccionar el botón "Guardar", el UDA aparece en la lista de funciones.

  7. Haga clic en la nueva función "TWA" para comprobar la definición de función.

Llamada de un UDA de JavaScript en una consulta de ASA

En Azure Portal, abra el trabajo, edite la consulta y llame a la función TWA() con el prefijo obligatorio "uda.". Por ejemplo:

WITH value AS
(
    SELECT
    NoiseLevelDB as level,
    DurationSecond as weight
FROM
    [YourInputAlias] TIMESTAMP BY EntryTime
)
SELECT
    System.Timestamp as ts,
    uda.TWA(value) as NoseDoseTWA
FROM value
GROUP BY TumblingWindow(minute, 5)

Prueba de la consulta con UDA

Cree un archivo JSON local con el contenido siguiente, cárguelo en el trabajo de Stream Analytics y pruebe la consulta anterior.

[
  {"EntryTime": "2017-06-10T05:01:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 22.0},
  {"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 81, "DurationSecond": 37.8},
  {"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 85, "DurationSecond": 26.3},
  {"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 95, "DurationSecond": 13.7},
  {"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 88, "DurationSecond": 10.3},
  {"EntryTime": "2017-06-10T05:05:00-07:00", "NoiseLevelDB": 103, "DurationSecond": 5.5},
  {"EntryTime": "2017-06-10T05:06:00-07:00", "NoiseLevelDB": 99, "DurationSecond": 23.0},
  {"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 1.76},
  {"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 79, "DurationSecond": 17.9},
  {"EntryTime": "2017-06-10T05:08:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 27.1},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 91, "DurationSecond": 17.1},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 115, "DurationSecond": 7.9},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 28.3},
  {"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 55, "DurationSecond": 18.2},
  {"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 25.8},
  {"EntryTime": "2017-06-10T05:11:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 11.4},
  {"EntryTime": "2017-06-10T05:12:00-07:00", "NoiseLevelDB": 89, "DurationSecond": 7.9},
  {"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 112, "DurationSecond": 3.7},
  {"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 9.7},
  {"EntryTime": "2017-06-10T05:18:00-07:00", "NoiseLevelDB": 96, "DurationSecond": 3.7},
  {"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 0.99},
  {"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 113, "DurationSecond": 25.1},
  {"EntryTime": "2017-06-10T05:22:00-07:00", "NoiseLevelDB": 110, "DurationSecond": 5.3}
]

Obtener ayuda

Para ayuda adicional, pruebe nuestra Página de preguntas y respuestas de Microsoft sobre Azure Stream Analytics.

Pasos siguientes