Delen via


time_window_rolling_avg_fl()

De functie time_window_rolling_avg_fl() is een door de gebruiker gedefinieerde functie (UDF) die het voortschrijdende gemiddelde van de vereiste waarde berekent over een tijdvenster met een constante duur.

Het berekenen van het voortschrijdende gemiddelde over een constant tijdvenster voor reguliere tijdreeksen (dat wil gezegd, met constante intervallen) kan worden bereikt met behulp van series_fir(), omdat het constante tijdvenster kan worden geconverteerd naar een filter met vaste breedte van gelijke coëfficiënten. Het berekenen ervan voor onregelmatige tijdreeksen is echter complexer, omdat het werkelijke aantal steekproeven in het venster varieert. Toch kan het worden bereikt met behulp van de krachtige scanoperator .

Dit type rolling window-berekening is vereist voor gebruiksvoorbeelden waarbij de metrische waarden alleen worden verzonden wanneer ze worden gewijzigd (en niet met constante intervallen). Bijvoorbeeld in IoT, waarbij edge-apparaten alleen bij wijzigingen metrische gegevens naar de cloud verzenden, waardoor de communicatiebandbreedte wordt geoptimaliseerd.

Syntax

T | invoke time_window_rolling_avg_fl(, t_col, y_col, key_coldt [,richting ])

Meer informatie over syntaxisconventies.

Parameters

Naam Type Vereist Beschrijving
t_col string ✔️ De naam van de kolom die het tijdstempel van de records bevat.
y_col string ✔️ De naam van de kolom die de metrische waarde van de records bevat.
key_col string ✔️ De naam van de kolom die de partitiesleutel van de records bevat.
Dt timespan ✔️ De duur van het lopende venster.
direction int De aggregatierichting. De mogelijke waarden zijn +1 of -1. Er wordt een doorlopend venster ingesteld vanaf de huidige tijd respectievelijk vooruit/achteruit. De standaardwaarde is -1, omdat achterwaarts rollend venster de enige mogelijke methode is voor streamingscenario's.

Functiedefinitie

U kunt de functie als volgt definiëren door de code in te sluiten als een door een query gedefinieerde functie of door deze te maken als een opgeslagen functie in uw database:

Definieer de functie met behulp van de volgende let-instructie. Er zijn geen machtigingen vereist.

Belangrijk

Een let-instructie kan niet zelfstandig worden uitgevoerd. Deze moet worden gevolgd door een tabellaire expressie-instructie. Zie Voorbeeld als u een werkend voorbeeld van time_window_rolling_avg_fl()wilt uitvoeren.

let time_window_rolling_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, dt:timespan, direction:int=int(-1))
{
    let tbl_ex = tbl | extend timestamp = column_ifexists(t_col, datetime(null)), value = column_ifexists(y_col, 0.0), key = column_ifexists(key_col, '');
    tbl_ex 
    | partition hint.strategy=shuffle by key 
    (
        extend timestamp=pack_array(timestamp, timestamp - direction*dt), delta = pack_array(-direction, direction)
        | mv-expand timestamp to typeof(datetime), delta to typeof(long)
        | sort by timestamp asc, delta desc    
        | scan declare (cum_sum:double=0.0, cum_count:long=0) with 
        (
            step s: true => cum_count = s.cum_count + delta, 
                            cum_sum = s.cum_sum + delta * value; 
        )
        | extend avg_value = iff(direction == 1, prev(cum_sum)/prev(cum_count), cum_sum/cum_count)
        | where delta == -direction 
        | project timestamp, value, avg_value, key
    )
};
// Write your query to use the function here.

Voorbeeld

In het volgende voorbeeld wordt de operator aanroepen gebruikt om de functie uit te voeren.

Als u een querygedefinieerde functie wilt gebruiken, roept u deze aan na de definitie van de ingesloten functie.

let time_window_rolling_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, dt:timespan, direction:int=int(-1))
{
    let tbl_ex = tbl | extend timestamp = column_ifexists(t_col, datetime(null)), value = column_ifexists(y_col, 0.0), key = column_ifexists(key_col, '');
    tbl_ex 
    | partition hint.strategy=shuffle by key 
    (
        extend timestamp=pack_array(timestamp, timestamp - direction*dt), delta = pack_array(-direction, direction)
        | mv-expand timestamp to typeof(datetime), delta to typeof(long)
        | sort by timestamp asc, delta desc    
        | scan declare (cum_sum:double=0.0, cum_count:long=0) with 
        (
            step s: true => cum_count = s.cum_count + delta, 
                            cum_sum = s.cum_sum + delta * value; 
        )
        | extend avg_value = iff(direction == 1, prev(cum_sum)/prev(cum_count), cum_sum/cum_count)
        | where delta == -direction 
        | project timestamp, value, avg_value, key
    )
};
let tbl = datatable(ts:datetime,  val:real, key:string) [
    datetime(8:00), 1, 'Device1',
    datetime(8:01), 2, 'Device1',
    datetime(8:05), 3, 'Device1',
    datetime(8:05), 10, 'Device2',
    datetime(8:09), 20, 'Device2',
    datetime(8:40), 4, 'Device1',
    datetime(9:00), 5, 'Device1',
    datetime(9:01), 6, 'Device1',
    datetime(9:05), 30, 'Device2',
    datetime(9:50), 7, 'Device1'
];
tbl
| invoke time_window_rolling_avg_fl('ts', 'val', 'key', 10m)

Uitvoer

tijdstempel waarde avg_value sleutel
2021-11-29 08:05:00.0000000 10 10 Apparaat2
2021-11-29 08:09:00.0000000 20 15 Apparaat2
2021-11-29 09:05:00.0000000 30 30 Apparaat2
2021-11-29 08:00:00.0000000 1 1 Apparaat1
2021-11-29 08:01:00.0000000 2 1,5 Apparaat1
2021-11-29 08:05:00.0000000 3 2 Apparaat1
2021-11-29 08:40:00.0000000 4 4 Apparaat1
2021-11-29 09:00:00.0000000 5 5 Apparaat1
2021-11-29 09:01:00.0000000 6 5.5 Apparaat1
2021-11-29 09:50:00.0000000 7 7 Apparaat1

De eerste waarde (10) om 8:05 bevat slechts één waarde, die in het achterwaartse venster van 10 minuten is gedaald, de tweede waarde (15) is het gemiddelde van twee steekproeven om 8:09 en om 8:05, enzovoort.