Upravit

Sdílet prostřednictvím


time_window_rolling_avg_fl()

Applies to: ✅ Microsoft FabricAzure Data ExplorerAzure MonitorMicrosoft Sentinel

The function time_window_rolling_avg_fl() is a user-defined function (UDF) that calculates the rolling average of the required value over a constant duration time window.

Calculating rolling average over a constant time window for regular time series (that is, having constant intervals) can be achieved using series_fir(), as the constant time window can be converted to a fixed width filter of equal coefficients. However, calculating it for irregular time series is more complex, as the actual number of samples in the window varies. Still it can be achieved using the powerful scan operator.

This type of rolling window calculation is required for use cases where the metric values are emitted only when changed (and not in constant intervals). For example in IoT, where edge devices send metrics to the cloud only upon changes, optimizing communication bandwidth.

Syntax

T | invoke time_window_rolling_avg_fl(t_col, y_col, key_col, dt [, direction ])

Learn more about syntax conventions.

Parameters

Name Type Required Description
t_col string ✔️ The name of the column containing the time stamp of the records.
y_col string ✔️ The name of the column containing the metric value of the records.
key_col string ✔️ The name of the column containing the partition key of the records.
dt timespan ✔️ The duration of the rolling window.
direction int The aggregation direction. The possible values are +1 or -1. A rolling window is set from current time forward/backward respectively. Default is -1, as backward rolling window is the only possible method for streaming scenarios.

Function definition

You can define the function by either embedding its code as a query-defined function, or creating it as a stored function in your database, as follows:

Define the function using the following let statement. No permissions are required.

Important

A let statement can't run on its own. It must be followed by a tabular expression statement. To run a working example of time_window_rolling_avg_fl(), see Example.

let time_window_rolling_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, dt:timespan, direction:int=int(-1))
{
    let tbl_ex = tbl | extend timestamp = column_ifexists(t_col, datetime(null)), value = column_ifexists(y_col, 0.0), key = column_ifexists(key_col, '');
    tbl_ex 
    | partition hint.strategy=shuffle by key 
    (
        extend timestamp=pack_array(timestamp, timestamp - direction*dt), delta = pack_array(-direction, direction)
        | mv-expand timestamp to typeof(datetime), delta to typeof(long)
        | sort by timestamp asc, delta desc    
        | scan declare (cum_sum:double=0.0, cum_count:long=0) with 
        (
            step s: true => cum_count = s.cum_count + delta, 
                            cum_sum = s.cum_sum + delta * value; 
        )
        | extend avg_value = iff(direction == 1, prev(cum_sum)/prev(cum_count), cum_sum/cum_count)
        | where delta == -direction 
        | project timestamp, value, avg_value, key
    )
};
// Write your query to use the function here.

Example

The following example uses the invoke operator to run the function.

To use a query-defined function, invoke it after the embedded function definition.

let time_window_rolling_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, dt:timespan, direction:int=int(-1))
{
    let tbl_ex = tbl | extend timestamp = column_ifexists(t_col, datetime(null)), value = column_ifexists(y_col, 0.0), key = column_ifexists(key_col, '');
    tbl_ex 
    | partition hint.strategy=shuffle by key 
    (
        extend timestamp=pack_array(timestamp, timestamp - direction*dt), delta = pack_array(-direction, direction)
        | mv-expand timestamp to typeof(datetime), delta to typeof(long)
        | sort by timestamp asc, delta desc    
        | scan declare (cum_sum:double=0.0, cum_count:long=0) with 
        (
            step s: true => cum_count = s.cum_count + delta, 
                            cum_sum = s.cum_sum + delta * value; 
        )
        | extend avg_value = iff(direction == 1, prev(cum_sum)/prev(cum_count), cum_sum/cum_count)
        | where delta == -direction 
        | project timestamp, value, avg_value, key
    )
};
let tbl = datatable(ts:datetime,  val:real, key:string) [
    datetime(8:00), 1, 'Device1',
    datetime(8:01), 2, 'Device1',
    datetime(8:05), 3, 'Device1',
    datetime(8:05), 10, 'Device2',
    datetime(8:09), 20, 'Device2',
    datetime(8:40), 4, 'Device1',
    datetime(9:00), 5, 'Device1',
    datetime(9:01), 6, 'Device1',
    datetime(9:05), 30, 'Device2',
    datetime(9:50), 7, 'Device1'
];
tbl
| invoke time_window_rolling_avg_fl('ts', 'val', 'key', 10m)

Output

timestamp value avg_value key
2021-11-29 08:05:00.0000000 10 10 Device2
2021-11-29 08:09:00.0000000 20 15 Device2
2021-11-29 09:05:00.0000000 30 30 Device2
2021-11-29 08:00:00.0000000 1 1 Device1
2021-11-29 08:01:00.0000000 2 1.5 Device1
2021-11-29 08:05:00.0000000 3 2 Device1
2021-11-29 08:40:00.0000000 4 4 Device1
2021-11-29 09:00:00.0000000 5 5 Device1
2021-11-29 09:01:00.0000000 6 5.5 Device1
2021-11-29 09:50:00.0000000 7 7 Device1

The first value (10) at 8:05 contains only a single value, which fell in the 10-minute backward window, the second value (15) is the average of two samples at 8:09 and at 8:05, etc.