Door de gebruiker gedefinieerde Azure Stream Analytics JavaScript-aggregaties

Azure Stream Analytics biedt ondersteuning voor door de gebruiker gedefinieerde aggregaties (UDA) die zijn geschreven in JavaScript. Hiermee kunt u complexe stateful bedrijfslogica implementeren. Binnen UDA hebt u volledige controle over de statusgegevensstructuur, statusaccumulatie, statusafcumulatie en cumulatieve resultaatberekening. In het artikel worden de twee verschillende JavaScript UDA-interfaces geïntroduceerd, stappen voor het maken van een UDA en het gebruik van UDA met op vensters gebaseerde bewerkingen in de Stream Analytics-query.

Door de gebruiker gedefinieerde JavaScript-aggregaties

Een door de gebruiker gedefinieerde statistische functie wordt gebruikt boven op een tijdvensterspecificatie om de gebeurtenissen in dat venster samen te voegen en één resultaatwaarde te produceren. Er zijn twee typen UDA-interfaces die Stream Analytics momenteel ondersteunt, AccumulateOnly en AccumulateDeaccumulate. Beide typen UDA kunnen worden gebruikt door Tumbling, Hopping, Sliding en Session Window. AccumulateDeaccumulate UDA presteert beter dan AccumulateOnly UDA bij gebruik samen met Hopping, Sliding en Session Window. U kiest een van de twee typen op basis van het algoritme dat u gebruikt.

Aggregaten van AccumulateOnly

AggregateOnly kan alleen nieuwe gebeurtenissen verzamelen tot de status, maar het algoritme staat deaccumulatie van waarden niet toe. Kies dit aggregatietype wanneer u een gebeurtenisgegevens van de statuswaarde niet kunt implementeren. Hier volgt de JavaScript-sjabloon voor AccumulatOnly-aggregaties:

// Sample UDA which state can only be accumulated.
function main() {
    this.init = function () {
        this.state = 0;
    }

    this.accumulate = function (value, timestamp) {
        this.state += value;
    }

    this.computeResult = function () {
        return this.state;
    }
}

Aggregaten accumulateDeaccumulate

Met aggregaten accumulateDeaccumulate kunt u deaccumulatie van een eerdere samengevoegde waarde uit de status verwijderen, bijvoorbeeld een sleutel-waardepaar verwijderen uit een lijst met gebeurteniswaarden of een waarde aftrekken van een somaggregaties. Hier volgt de JavaScript-sjabloon voor AggregateDeaccumulate:

// Sample UDA which state can be accumulated and deaccumulated.
function main() {
    this.init = function () {
        this.state = 0;
    }

    this.accumulate = function (value, timestamp) {
        this.state += value;
    }

    this.deaccumulate = function (value, timestamp) {
        this.state -= value;
    }

    this.deaccumulateState = function (otherState){
        this.state -= otherState.state;
    }

    this.computeResult = function () {
        return this.state;
    }
}

UDA - Declaratie van JavaScript-functie

Elke JavaScript-UDA wordt gedefinieerd door een functieobjectdeclaratie. Hieronder volgen de belangrijkste elementen in een UDA-definitie.

Functiealias

De functiealias is de UDA-id. Wanneer u wordt aangeroepen in de Stream Analytics-query, gebruikt u altijd UDA-alias samen met een voorvoegsel 'uda'.

Functietype

Voor UDA moet het functietype JavaScript-UDA zijn.

Uitvoertype

Een specifiek type dat door de Stream Analytics-taak wordt ondersteund of 'Any' als u het type in uw query wilt verwerken.

Functienaam

De naam van dit functieobject. De functienaam moet overeenkomen met de UDA-alias.

Methode - init()

De init()-methode initialiseert de status van de statistische functie. Deze methode wordt aangeroepen wanneer het venster wordt gestart.

Methode – accumulate()

De methode accumulate() berekent de UDA-status op basis van de vorige status en de huidige gebeurteniswaarden. Deze methode wordt aangeroepen wanneer een gebeurtenis een tijdvenster binnenkomt (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW of SESSIONWINDOW).

Methode – deaccumulate()

De methode deaccumulate() berekent de status opnieuw op basis van de vorige status en de huidige gebeurteniswaarden. Deze methode wordt aangeroepen wanneer een gebeurtenis een SLIDINGWINDOW of SESSIONWINDOW verlaat.

Methode – deaccumulateState()

De methode deaccumulateState() berekent de status opnieuw op basis van de vorige status en de status van een hop. Deze methode wordt aangeroepen wanneer een reeks gebeurtenissen een HOPPINGWINDOW verlaat.

Methode – computeResult()

De methode computeResult() retourneert een geaggregeerd resultaat op basis van de huidige status. Deze methode wordt aangeroepen aan het einde van een tijdvenster (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW of SESSIONWINDOW).

JavaScript UDA ondersteunde invoer- en uitvoergegevenstypen

Raadpleeg voor JavaScript UDA-gegevenstypen de sectie Stream Analytics en javaScript-typeconversie van JavaScript UDF's integreren.

Een JavaScript-UDA toevoegen vanuit Azure Portal

Hieronder doorlopen we het proces voor het maken van een UDA vanuit de portal. Het voorbeeld dat we hier gebruiken, zijn rekentijdgewogen gemiddelden.

We gaan nu een JavaScript-UDA maken onder een bestaande ASA-taak door de volgende stappen uit te voeren.

  1. Meld u aan bij Azure Portal en zoek uw bestaande Stream Analytics-taak.

  2. Selecteer vervolgens de functiekoppeling onder TAAKTOPOLOGIE.

  3. Selecteer Toevoegen om een nieuwe functie toe te voegen.

  4. Selecteer in de weergave Nieuwe functie javaScript-UDA als het functietype. Vervolgens ziet u dat er een standaard-UDA-sjabloon wordt weergegeven in de editor.

  5. Vul TWA in als de UDA-alias en wijzig de implementatie van de functie als volgt:

    // Sample UDA which calculate Time-Weighted Average of incoming values.
    function main() {
        this.init = function () {
            this.totalValue = 0.0;
            this.totalWeight = 0.0;
        }
    
        this.accumulate = function (value, timestamp) {
            this.totalValue += value.level * value.weight;
            this.totalWeight += value.weight;
    
        }
    
        // Uncomment below for AccumulateDeaccumulate implementation
        /*
        this.deaccumulate = function (value, timestamp) {
            this.totalValue -= value.level * value.weight;
            this.totalWeight -= value.weight;
        }
    
        this.deaccumulateState = function (otherState){
            this.state -= otherState.state;
            this.totalValue -= otherState.totalValue;
            this.totalWeight -= otherState.totalWeight;
        }
        */
    
        this.computeResult = function () {
            if(this.totalValue == 0) {
                result = 0;
            }
            else {
                result = this.totalValue/this.totalWeight;
            }
            return result;
        }
    }
    
  6. Zodra u de knop Opslaan hebt geselecteerd, wordt uw UDA weergegeven in de lijst met functies.

  7. Selecteer de nieuwe functie 'TWA', u kunt de functiedefinitie controleren.

JavaScript-UDA aanroepen in ASA-query

Bewerk de query en roep de TWA()-functie aan met het mandaatvoorvoegsel 'uda'. Voorbeeld:

WITH value AS
(
    SELECT
    NoiseLevelDB as level,
    DurationSecond as weight
FROM
    [YourInputAlias] TIMESTAMP BY EntryTime
)
SELECT
    System.Timestamp as ts,
    uda.TWA(value) as NoseDoseTWA
FROM value
GROUP BY TumblingWindow(minute, 5)

Query testen met UDA

Maak een lokaal JSON-bestand met onderstaande inhoud, upload het bestand naar de Stream Analytics-taak en test de bovenstaande query.

[
  {"EntryTime": "2017-06-10T05:01:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 22.0},
  {"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 81, "DurationSecond": 37.8},
  {"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 85, "DurationSecond": 26.3},
  {"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 95, "DurationSecond": 13.7},
  {"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 88, "DurationSecond": 10.3},
  {"EntryTime": "2017-06-10T05:05:00-07:00", "NoiseLevelDB": 103, "DurationSecond": 5.5},
  {"EntryTime": "2017-06-10T05:06:00-07:00", "NoiseLevelDB": 99, "DurationSecond": 23.0},
  {"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 1.76},
  {"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 79, "DurationSecond": 17.9},
  {"EntryTime": "2017-06-10T05:08:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 27.1},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 91, "DurationSecond": 17.1},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 115, "DurationSecond": 7.9},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 28.3},
  {"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 55, "DurationSecond": 18.2},
  {"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 25.8},
  {"EntryTime": "2017-06-10T05:11:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 11.4},
  {"EntryTime": "2017-06-10T05:12:00-07:00", "NoiseLevelDB": 89, "DurationSecond": 7.9},
  {"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 112, "DurationSecond": 3.7},
  {"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 9.7},
  {"EntryTime": "2017-06-10T05:18:00-07:00", "NoiseLevelDB": 96, "DurationSecond": 3.7},
  {"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 0.99},
  {"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 113, "DurationSecond": 25.1},
  {"EntryTime": "2017-06-10T05:22:00-07:00", "NoiseLevelDB": 110, "DurationSecond": 5.3}
]

Hulp vragen

Probeer onze Microsoft Q&A-vragenpagina voor Azure Stream Analytics voor meer hulp.

Volgende stappen