Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Azure Stream Analytics unterstützt in JavaScript geschriebene, benutzerdefinierte Aggregate (UDA), mit deren Hilfe Sie komplexe zustandsbehaftete Geschäftslogik implementieren können. Innerhalb von UDA haben Sie die vollständige Kontrolle über die Zustandsdatenstruktur, die Zustandsakkumulation/-deakkumulation und die Berechnung des Aggregatergebnisses. Der Artikel beschreibt die beiden unterschiedlichen JavaScript-UDA-Schnittstellen, Schritte zum Erstellen eines UDA sowie die Verwendung von UDA mit Windows-basierten Vorgängen in Stream Analytics-Abfragen.
Ein benutzerdefiniertes Aggregat wird im Rahmen einer Zeitfensterspezifikation verwendet, um die Ereignisse in diesem Fenster zu aggregieren und einen einzelnen Ergebniswert zu erzeugen. Es gibt zwei Typen von UDA-Schnittstellen, die zurzeit von Stream Analytics unterstützt werden: AccumulateOnly und AccumulateDeaccumulate. Beide Typen von UDA können mit rollierenden, springenden, gleitenden und Sitzungsfenstern verwendet werden. AccumulateDeaccumulate-UDA bietet für springende, gleitende und Sitzungsfenster eine bessere Leistung als AccumulateOnly-UDA. Sie wählen anhand des verwendeten Algorithmus zwischen den beiden Typen.
AccumulateOnly Aggregate können nur neue Ereignisse für den Zustand akkumulieren, eine Dekumulation von Werten lässt der Algorithmus nicht zu. Wählen Sie diesen Aggregattyp aus, wenn eine Dekumulation der Informationen zu einem Ereignis aus dem Zustandswert nicht implementiert werden kann. Im Folgenden finden Sie die JavaScript-Vorlage für AccumulateOnly-Aggregate:
// Sample UDA which state can only be accumulated.
function main() {
this.init = function () {
this.state = 0;
}
this.accumulate = function (value, timestamp) {
this.state += value;
}
this.computeResult = function () {
return this.state;
}
}
AccumulateDeaccumulate Aggregate ermöglichen die Dekumulation eines zuvor akkumulierten Werts aus dem Zustand, z.B. das Entfernen eines Schlüssel-Wert-Paars aus einer Liste von Ereigniswerte oder das Subtrahieren eines Werts von einem Zustand der aggregierten Summe. Im Folgenden finden Sie die JavaScript-Vorlage für AccumulateDeaccumulate-Aggregate:
// Sample UDA which state can be accumulated and deaccumulated.
function main() {
this.init = function () {
this.state = 0;
}
this.accumulate = function (value, timestamp) {
this.state += value;
}
this.deaccumulate = function (value, timestamp) {
this.state -= value;
}
this.deaccumulateState = function (otherState){
this.state -= otherState.state;
}
this.computeResult = function () {
return this.state;
}
}
Jedes JavaScript-UDA wird durch eine Funktionsobjektdeklaration definiert. Es folgen die Hauptelemente einer UDA-Definition.
Der Funktionsalias ist der UDA-Bezeichner. Verwenden Sie beim Aufrufen in einer Stream Analytics-Abfrage immer den UDA-Alias zusammen mit „uda.“ als Präfix.
Für UDA muss der Funktionstyp Javascript UDA sein.
Eine bestimmter vom Stream Analytics-Auftrag unterstützter Typ oder „Any“, wenn Sie den Typ in Ihrer Abfrage behandeln möchten.
Der Name dieses Funktionsobjekts. Der Funktionsname muss mit dem UDA-Alias identisch sein.
Die init()-Methode initialisiert den Zustand des Aggregats. Diese Methode wird beim Starten des Fensters aufgerufen.
Die accumulate()-Methode berechnet den UDA-Zustand basierend auf dem vorherigen Zustand und den aktuellen Ereigniswerten. Diese Methode wird aufgerufen, wenn ein Ereignis in ein Zeitfenster (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW oder SESSIONWINDOW) eintritt.
Die deaccumulate()-Methode berechnet den Zustand basierend auf dem vorherigen Zustand und den aktuellen Ereigniswerten. Diese Methode wird aufgerufen, wenn ein Ereignis ein SLIDINGWINDOW oder SESSIONWINDOW verlässt.
Die deaccumulateState()-Methode berechnet den Zustand basierend auf dem vorherigen Zustand und dem Zustand eines Hops. Diese Methode wird aufgerufen, wenn ein Satz von Ereignissen ein HOPPINGWINDOW verlässt.
Die computeResult()-Methode gibt das aggregierte Ergebnis basierend auf dem aktuellen Zustand zurück. Diese Methode wird am Ende eines Zeitfensters (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW oder SESSIONWINDOW) aufgerufen.
Informationen zu JavaScript-UDA-Datentypen finden Sie im Abschnitt Stream Analytics- und JavaScript-Typkonvertierung unter Integrieren von JavaScript-UDFs.
Im Folgenden finden Sie eine exemplarische Vorgehensweise für die Erstellung eines UDA über das Portal. Im hier verwendeten Beispiel werden die zeitlich gewichteten Durchschnittswerte berechnet.
Nun erstellen wir anhand der folgenden Schritte ein JavaScript-UDA unter einem vorhandenen ASA-Auftrag.
Melden Sie sich beim Azure-Portal an, und suchen Sie Ihren vorhandenen Stream Analytics-Auftrag.
Wählen Sie dann den Funktionslink unter AUFTRAGSTOPOLOGIE aus.
Wählen Sie Hinzufügen aus, um eine neue Funktion hinzuzufügen.
Wählen Sie in der Ansicht „Neue Funktion“ den Funktionstyp JavaScript-UDA aus. Daraufhin wird im Editor eine UDA-Standardvorlage angezeigt.
Geben Sie „TWA“ als UDA-Alias ein, und ändern Sie die Funktionsimplementierung wie im Folgenden angegeben:
// Sample UDA which calculate Time-Weighted Average of incoming values. function main() { this.init = function () { this.totalValue = 0.0; this.totalWeight = 0.0; } this.accumulate = function (value, timestamp) { this.totalValue += value.level * value.weight; this.totalWeight += value.weight; } // Uncomment below for AccumulateDeaccumulate implementation /* this.deaccumulate = function (value, timestamp) { this.totalValue -= value.level * value.weight; this.totalWeight -= value.weight; } this.deaccumulateState = function (otherState){ this.state -= otherState.state; this.totalValue -= otherState.totalValue; this.totalWeight -= otherState.totalWeight; } */ this.computeResult = function () { if(this.totalValue == 0) { result = 0; } else { result = this.totalValue/this.totalWeight; } return result; } }
Sobald Sie auf die Schaltfläche „Speichern“ klicken, wird das UDA in der Liste der Funktionen angezeigt.
Wenn Sie auf die neue Funktion „TWA“ klicken, können Sie die Funktionsdefinition überprüfen.
Öffnen Sie Ihren Auftrag im Azure-Portal, bearbeiten Sie die Abfrage, und rufen Sie die TWA()-Funktion mit dem obligatorischen Präfix „uda.“ auf. Beispiel:
WITH value AS
(
SELECT
NoiseLevelDB as level,
DurationSecond as weight
FROM
[YourInputAlias] TIMESTAMP BY EntryTime
)
SELECT
System.Timestamp as ts,
uda.TWA(value) as NoseDoseTWA
FROM value
GROUP BY TumblingWindow(minute, 5)
Erstellen Sie eine lokale JSON-Datei mit dem nachstehenden Inhalt, laden Sie die Datei in einen Stream Analytics-Auftrag hoch, und testen Sie die vorstehende Abfrage.
[
{"EntryTime": "2017-06-10T05:01:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 22.0},
{"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 81, "DurationSecond": 37.8},
{"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 85, "DurationSecond": 26.3},
{"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 95, "DurationSecond": 13.7},
{"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 88, "DurationSecond": 10.3},
{"EntryTime": "2017-06-10T05:05:00-07:00", "NoiseLevelDB": 103, "DurationSecond": 5.5},
{"EntryTime": "2017-06-10T05:06:00-07:00", "NoiseLevelDB": 99, "DurationSecond": 23.0},
{"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 1.76},
{"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 79, "DurationSecond": 17.9},
{"EntryTime": "2017-06-10T05:08:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 27.1},
{"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 91, "DurationSecond": 17.1},
{"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 115, "DurationSecond": 7.9},
{"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 28.3},
{"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 55, "DurationSecond": 18.2},
{"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 25.8},
{"EntryTime": "2017-06-10T05:11:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 11.4},
{"EntryTime": "2017-06-10T05:12:00-07:00", "NoiseLevelDB": 89, "DurationSecond": 7.9},
{"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 112, "DurationSecond": 3.7},
{"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 9.7},
{"EntryTime": "2017-06-10T05:18:00-07:00", "NoiseLevelDB": 96, "DurationSecond": 3.7},
{"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 0.99},
{"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 113, "DurationSecond": 25.1},
{"EntryTime": "2017-06-10T05:22:00-07:00", "NoiseLevelDB": 110, "DurationSecond": 5.3}
]
Weitere Hilfe finden Sie auf der Frageseite von Microsoft Q&A (Fragen und Antworten) zu Azure Stream Analytics.
- Einführung in Azure Stream Analytics
- Erste Schritte mit Azure Stream Analytics
- Skalieren von Azure Stream Analytics-Aufträgen
- Stream Analytics Query Language Reference (Referenz zur Stream Analytics-Abfragesprache)
- Azure Stream Analytics management REST API reference (Referenz zur Azure Stream Analytics-Verwaltungs-REST-API)