time_window_rolling_avg_fl()
Applies to: ✅ Microsoft Fabric ✅ Azure Data Explorer ✅ Azure Monitor ✅ Microsoft Sentinel
The function time_window_rolling_avg_fl()
is a user-defined function (UDF) that calculates the rolling average of the required value over a constant duration time window.
Calculating rolling average over a constant time window for regular time series (that is, having constant intervals) can be achieved using series_fir(), as the constant time window can be converted to a fixed width filter of equal coefficients. However, calculating it for irregular time series is more complex, as the actual number of samples in the window varies. Still it can be achieved using the powerful scan operator.
This type of rolling window calculation is required for use cases where the metric values are emitted only when changed (and not in constant intervals). For example in IoT, where edge devices send metrics to the cloud only upon changes, optimizing communication bandwidth.
Syntax
T | invoke time_window_rolling_avg_fl(
t_col,
y_col,
key_col,
dt [,
direction ])
Learn more about syntax conventions.
Parameters
Name | Type | Required | Description |
---|---|---|---|
t_col | string |
✔️ | The name of the column containing the time stamp of the records. |
y_col | string |
✔️ | The name of the column containing the metric value of the records. |
key_col | string |
✔️ | The name of the column containing the partition key of the records. |
dt | timespan |
✔️ | The duration of the rolling window. |
direction | int |
The aggregation direction. The possible values are +1 or -1. A rolling window is set from current time forward/backward respectively. Default is -1, as backward rolling window is the only possible method for streaming scenarios. |
Function definition
You can define the function by either embedding its code as a query-defined function, or creating it as a stored function in your database, as follows:
Define the function using the following let statement. No permissions are required.
Important
A let statement can't run on its own. It must be followed by a tabular expression statement. To run a working example of time_window_rolling_avg_fl()
, see Example.
let time_window_rolling_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, dt:timespan, direction:int=int(-1))
{
let tbl_ex = tbl | extend timestamp = column_ifexists(t_col, datetime(null)), value = column_ifexists(y_col, 0.0), key = column_ifexists(key_col, '');
tbl_ex
| partition hint.strategy=shuffle by key
(
extend timestamp=pack_array(timestamp, timestamp - direction*dt), delta = pack_array(-direction, direction)
| mv-expand timestamp to typeof(datetime), delta to typeof(long)
| sort by timestamp asc, delta desc
| scan declare (cum_sum:double=0.0, cum_count:long=0) with
(
step s: true => cum_count = s.cum_count + delta,
cum_sum = s.cum_sum + delta * value;
)
| extend avg_value = iff(direction == 1, prev(cum_sum)/prev(cum_count), cum_sum/cum_count)
| where delta == -direction
| project timestamp, value, avg_value, key
)
};
// Write your query to use the function here.
Example
The following example uses the invoke operator to run the function.
To use a query-defined function, invoke it after the embedded function definition.
let time_window_rolling_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, dt:timespan, direction:int=int(-1))
{
let tbl_ex = tbl | extend timestamp = column_ifexists(t_col, datetime(null)), value = column_ifexists(y_col, 0.0), key = column_ifexists(key_col, '');
tbl_ex
| partition hint.strategy=shuffle by key
(
extend timestamp=pack_array(timestamp, timestamp - direction*dt), delta = pack_array(-direction, direction)
| mv-expand timestamp to typeof(datetime), delta to typeof(long)
| sort by timestamp asc, delta desc
| scan declare (cum_sum:double=0.0, cum_count:long=0) with
(
step s: true => cum_count = s.cum_count + delta,
cum_sum = s.cum_sum + delta * value;
)
| extend avg_value = iff(direction == 1, prev(cum_sum)/prev(cum_count), cum_sum/cum_count)
| where delta == -direction
| project timestamp, value, avg_value, key
)
};
let tbl = datatable(ts:datetime, val:real, key:string) [
datetime(8:00), 1, 'Device1',
datetime(8:01), 2, 'Device1',
datetime(8:05), 3, 'Device1',
datetime(8:05), 10, 'Device2',
datetime(8:09), 20, 'Device2',
datetime(8:40), 4, 'Device1',
datetime(9:00), 5, 'Device1',
datetime(9:01), 6, 'Device1',
datetime(9:05), 30, 'Device2',
datetime(9:50), 7, 'Device1'
];
tbl
| invoke time_window_rolling_avg_fl('ts', 'val', 'key', 10m)
Output
timestamp | value | avg_value | key |
---|---|---|---|
2021-11-29 08:05:00.0000000 | 10 | 10 | Device2 |
2021-11-29 08:09:00.0000000 | 20 | 15 | Device2 |
2021-11-29 09:05:00.0000000 | 30 | 30 | Device2 |
2021-11-29 08:00:00.0000000 | 1 | 1 | Device1 |
2021-11-29 08:01:00.0000000 | 2 | 1.5 | Device1 |
2021-11-29 08:05:00.0000000 | 3 | 2 | Device1 |
2021-11-29 08:40:00.0000000 | 4 | 4 | Device1 |
2021-11-29 09:00:00.0000000 | 5 | 5 | Device1 |
2021-11-29 09:01:00.0000000 | 6 | 5.5 | Device1 |
2021-11-29 09:50:00.0000000 | 7 | 7 | Device1 |
The first value (10) at 8:05 contains only a single value, which fell in the 10-minute backward window, the second value (15) is the average of two samples at 8:09 and at 8:05, etc.