Sdílet prostřednictvím


time_window_rolling_avg_fl()

Funkce time_window_rolling_avg_fl() je uživatelem definovaná funkce (UDF), která vypočítá klouzavý průměr požadované hodnoty v časovém intervalu konstantní doby trvání.

Výpočet klouzavého průměru v rámci konstantního časového intervalu pro běžnou časovou řadu (tj. s konstantními intervaly) lze dosáhnout pomocí series_fir(), protože interval konstantního času lze převést na filtr pevné šířky stejných koeficientů. Výpočet nepravidelných časových řad je ale složitější, protože skutečný počet vzorků v okně se liší. Přesto toho lze dosáhnout pomocí výkonného operátoru skenování .

Tento typ výpočtu posuvného okna se vyžaduje pro případy použití, kdy se hodnoty metrik generují pouze při změně (a ne v konstantních intervalech). Například v IoT, kde hraniční zařízení odesílají metriky do cloudu jenom při změnách a optimalizují šířku pásma komunikace.

Syntaxe

T | invoke time_window_rolling_avg_fl(t_col y_col key_col, ,dt [, směr ], )

Přečtěte si další informace o konvencích syntaxe.

Parametry

Název Type Požadováno Popis
t_col string ✔️ Název sloupce obsahujícího časové razítko záznamů.
y_col string ✔️ Název sloupce obsahujícího hodnotu metriky záznamů.
key_col string ✔️ Název sloupce, který obsahuje klíč oddílu záznamů.
Dt timespan ✔️ Doba trvání posuvného okna.
direction int Směr agregace. Možné hodnoty jsou +1 nebo -1. Posuvné okno se nastavuje od aktuálního času dopředu/dozadu. Výchozí hodnota je -1, protože zpětné posuvné okno je jedinou možnou metodou pro scénáře streamování.

Definice funkce

Funkci můžete definovat vložením jejího kódu jako funkce definovanou dotazem nebo vytvořením jako uložené funkce v databázi následujícím způsobem:

Definujte funkci pomocí následujícího příkazu let. Nejsou vyžadována žádná oprávnění.

Důležité

Příkaz let nemůže běžet samostatně. Musí následovat příkaz tabulkového výrazu. Pokud chcete spustit funkční příklad, podívejte se na příkladtime_window_rolling_avg_fl().

let time_window_rolling_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, dt:timespan, direction:int=int(-1))
{
    let tbl_ex = tbl | extend timestamp = column_ifexists(t_col, datetime(null)), value = column_ifexists(y_col, 0.0), key = column_ifexists(key_col, '');
    tbl_ex 
    | partition hint.strategy=shuffle by key 
    (
        extend timestamp=pack_array(timestamp, timestamp - direction*dt), delta = pack_array(-direction, direction)
        | mv-expand timestamp to typeof(datetime), delta to typeof(long)
        | sort by timestamp asc, delta desc    
        | scan declare (cum_sum:double=0.0, cum_count:long=0) with 
        (
            step s: true => cum_count = s.cum_count + delta, 
                            cum_sum = s.cum_sum + delta * value; 
        )
        | extend avg_value = iff(direction == 1, prev(cum_sum)/prev(cum_count), cum_sum/cum_count)
        | where delta == -direction 
        | project timestamp, value, avg_value, key
    )
};
// Write your query to use the function here.

Příklad

Následující příklad používá operátor invoke ke spuštění funkce.

Pokud chcete použít funkci definovanou dotazem, vyvoláte ji po definici vložené funkce.

let time_window_rolling_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, dt:timespan, direction:int=int(-1))
{
    let tbl_ex = tbl | extend timestamp = column_ifexists(t_col, datetime(null)), value = column_ifexists(y_col, 0.0), key = column_ifexists(key_col, '');
    tbl_ex 
    | partition hint.strategy=shuffle by key 
    (
        extend timestamp=pack_array(timestamp, timestamp - direction*dt), delta = pack_array(-direction, direction)
        | mv-expand timestamp to typeof(datetime), delta to typeof(long)
        | sort by timestamp asc, delta desc    
        | scan declare (cum_sum:double=0.0, cum_count:long=0) with 
        (
            step s: true => cum_count = s.cum_count + delta, 
                            cum_sum = s.cum_sum + delta * value; 
        )
        | extend avg_value = iff(direction == 1, prev(cum_sum)/prev(cum_count), cum_sum/cum_count)
        | where delta == -direction 
        | project timestamp, value, avg_value, key
    )
};
let tbl = datatable(ts:datetime,  val:real, key:string) [
    datetime(8:00), 1, 'Device1',
    datetime(8:01), 2, 'Device1',
    datetime(8:05), 3, 'Device1',
    datetime(8:05), 10, 'Device2',
    datetime(8:09), 20, 'Device2',
    datetime(8:40), 4, 'Device1',
    datetime(9:00), 5, 'Device1',
    datetime(9:01), 6, 'Device1',
    datetime(9:05), 30, 'Device2',
    datetime(9:50), 7, 'Device1'
];
tbl
| invoke time_window_rolling_avg_fl('ts', 'val', 'key', 10m)

Výstup

časové razítko hodnota avg_value key
2021-11-29 08:05:00.0000000 10 10 Zařízení 2
2021-11-29 08:09:00.0000000 20 15 Zařízení 2
2021-11-29 09:05:00.0000000 30 30 Zařízení 2
2021-11-29 08:00:00.0000000 1 1 Zařízení 1
2021-11-29 08:01:00.0000000 2 1.5 Zařízení 1
2021-11-29 08:05:00.0000000 3 2 Zařízení 1
2021-11-29 08:40:00.0000000 4 4 Zařízení 1
2021-11-29 09:00:00.0000000 5 5 Zařízení 1
2021-11-29 09:01:00.0000000 6 5.5 Zařízení 1
2021-11-29 09:50:00.0000000 7 7 Zařízení 1

První hodnota (10) v 8:05 obsahuje pouze jednu hodnotu, která klesla v 10minutovém zpětném okně, druhá hodnota (15) je průměr dvou vzorků v 8:09 a v 8:05 atd.