Udostępnij za pośrednictwem


time_window_rolling_avg_fl()

Funkcja time_window_rolling_avg_fl() jest funkcją zdefiniowaną przez użytkownika (UDF), która oblicza średnią kroczącą wymaganej wartości w stałym przedziale czasu trwania.

Obliczanie średniej kroczącej w stałym przedziale czasu dla zwykłych szeregów czasowych (czyli o stałych interwałach) można osiągnąć przy użyciu series_fir(), ponieważ stałe przedziały czasu można przekonwertować na stały filtr szerokości równych współczynników. Jednak obliczanie go dla nieregularnych szeregów czasowych jest bardziej złożone, ponieważ rzeczywista liczba próbek w oknie jest różna. Nadal można to osiągnąć przy użyciu zaawansowanego operatora skanowania .

Ten typ obliczania okna kroczącego jest wymagany w przypadkach użycia, w których wartości metryk są emitowane tylko w przypadku zmiany (a nie w stałych odstępach czasu). Na przykład w usłudze IoT, gdzie urządzenia brzegowe wysyłają metryki do chmury tylko po zmianach, optymalizując przepustowość komunikacji.

Składnia

T | invoke time_window_rolling_avg_fl(, t_col, y_col, key_coldt [,kierunek ])

Dowiedz się więcej o konwencjach składniowych.

Parametry

Nazwa Typ Wymagane Opis
t_col string ✔️ Nazwa kolumny zawierającej sygnaturę czasową rekordów.
y_col string ✔️ Nazwa kolumny zawierającej wartość metryki rekordów.
key_col string ✔️ Nazwa kolumny zawierającej klucz partycji rekordów.
Dt timespan ✔️ Czas trwania okna operacyjnego.
Kierunku int Kierunek agregacji. Możliwe wartości to +1 lub -1. Okno kroczące jest ustawiane odpowiednio z bieżącego czasu do przodu/do tyłu. Wartość domyślna to -1, ponieważ okno wycofywania wstecznego jest jedyną możliwą metodą dla scenariuszy przesyłania strumieniowego.

Definicja funkcji

Funkcję można zdefiniować, osadzając jej kod jako funkcję zdefiniowaną przez zapytanie lub tworząc ją jako funkcję przechowywaną w bazie danych w następujący sposób:

Zdefiniuj funkcję przy użyciu następującej instrukcji let. Nie są wymagane żadne uprawnienia.

Ważne

Instrukcja let nie może być uruchamiana samodzielnie. Po nim musi znajdować się instrukcja wyrażenia tabelarycznego. Aby uruchomić działający przykład time_window_rolling_avg_fl()polecenia , zobacz Przykład.

let time_window_rolling_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, dt:timespan, direction:int=int(-1))
{
    let tbl_ex = tbl | extend timestamp = column_ifexists(t_col, datetime(null)), value = column_ifexists(y_col, 0.0), key = column_ifexists(key_col, '');
    tbl_ex 
    | partition hint.strategy=shuffle by key 
    (
        extend timestamp=pack_array(timestamp, timestamp - direction*dt), delta = pack_array(-direction, direction)
        | mv-expand timestamp to typeof(datetime), delta to typeof(long)
        | sort by timestamp asc, delta desc    
        | scan declare (cum_sum:double=0.0, cum_count:long=0) with 
        (
            step s: true => cum_count = s.cum_count + delta, 
                            cum_sum = s.cum_sum + delta * value; 
        )
        | extend avg_value = iff(direction == 1, prev(cum_sum)/prev(cum_count), cum_sum/cum_count)
        | where delta == -direction 
        | project timestamp, value, avg_value, key
    )
};
// Write your query to use the function here.

Przykład

W poniższym przykładzie użyto operatora invoke do uruchomienia funkcji.

Aby użyć funkcji zdefiniowanej przez zapytanie, wywołaj ją po definicji funkcji osadzonej.

let time_window_rolling_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, dt:timespan, direction:int=int(-1))
{
    let tbl_ex = tbl | extend timestamp = column_ifexists(t_col, datetime(null)), value = column_ifexists(y_col, 0.0), key = column_ifexists(key_col, '');
    tbl_ex 
    | partition hint.strategy=shuffle by key 
    (
        extend timestamp=pack_array(timestamp, timestamp - direction*dt), delta = pack_array(-direction, direction)
        | mv-expand timestamp to typeof(datetime), delta to typeof(long)
        | sort by timestamp asc, delta desc    
        | scan declare (cum_sum:double=0.0, cum_count:long=0) with 
        (
            step s: true => cum_count = s.cum_count + delta, 
                            cum_sum = s.cum_sum + delta * value; 
        )
        | extend avg_value = iff(direction == 1, prev(cum_sum)/prev(cum_count), cum_sum/cum_count)
        | where delta == -direction 
        | project timestamp, value, avg_value, key
    )
};
let tbl = datatable(ts:datetime,  val:real, key:string) [
    datetime(8:00), 1, 'Device1',
    datetime(8:01), 2, 'Device1',
    datetime(8:05), 3, 'Device1',
    datetime(8:05), 10, 'Device2',
    datetime(8:09), 20, 'Device2',
    datetime(8:40), 4, 'Device1',
    datetime(9:00), 5, 'Device1',
    datetime(9:01), 6, 'Device1',
    datetime(9:05), 30, 'Device2',
    datetime(9:50), 7, 'Device1'
];
tbl
| invoke time_window_rolling_avg_fl('ts', 'val', 'key', 10m)

Dane wyjściowe

sygnatura czasowa wartość avg_value key
2021-11-29 08:05:00.0000000 10 10 Urządzenie 2
2021-11-29 08:09:00.0000000 20 15 Urządzenie 2
2021-11-29 09:05:00.0000000 30 30 Urządzenie 2
2021-11-29 08:00:00.0000000 1 1 Urządzenie1
2021-11-29 08:01:00.0000000 2 1,5 Urządzenie1
2021-11-29 08:05:00.0000000 3 2 Urządzenie1
2021-11-29 08:40:00.0000000 4 4 Urządzenie1
2021-11-29 09:00:00.0000000 5 5 Urządzenie1
2021-11-29 09:01:00.0000000 6 5,5 Urządzenie1
2021-11-29 09:50:00.0000000 7 7 Urządzenie1

Pierwsza wartość (10) o wartości 8:05 zawiera tylko jedną wartość, która spadła w 10-minutowym przedziale wstecz, druga wartość (15) to średnia dwóch próbek na poziomie 8:09 i 8:05 itd.