log_reduce_predict_fl()
Applies to: ✅ Microsoft Fabric ✅ Azure Data Explorer
The function log_reduce_predict_fl()
parses semi structured textual columns, such as log lines, and for each line it matches the respective pattern from a pretrained model or reports an anomaly if no matching pattern was found. The function's' output is similar to log_reduce_fl(), though the patterns are retrieved from a pretrained model that generated by log_reduce_train_fl().
Prerequisites
- The Python plugin must be enabled on the cluster. This is required for the inline Python used in the function.
- The Python plugin must be enabled on the database. This is required for the inline Python used in the function.
Syntax
T |
invoke
log_reduce_predict_fl(
models_tbl,
model_name,
reduce_col [,
anomaly_str ])
Learn more about syntax conventions.
Parameters
Name | Type | Required | Description |
---|---|---|---|
models_tbl | table | ✔️ | A table containing models generated by log_reduce_train_fl(). The table's schema should be (name:string, timestamp: datetime, model:string). |
model_name | string |
✔️ | The name of the model that will be retrieved from models_tbl. If the table contains few models matching the model name, the latest one is used. |
reduce_col | string |
✔️ | The name of the string column the function is applied to. |
anomaly_str | string |
This string is output for lines that have no matched pattern in the model. Default value is "ANOMALY". |
Function definition
You can define the function by either embedding its code as a query-defined function, or creating it as a stored function in your database, as follows:
Define the function using the following let statement. No permissions are required.
Important
A let statement can't run on its own. It must be followed by a tabular expression statement. To run a working example of log_reduce_fl()
, see Example.
let log_reduce_predict_fl=(tbl:(*), models_tbl: (name:string, timestamp: datetime, model:string),
model_name:string, reduce_col:string, anomaly_str: string = 'ANOMALY')
{
let model_str = toscalar(models_tbl | where name == model_name | top 1 by timestamp desc | project model);
let kwargs = bag_pack('logs_col', reduce_col, 'output_patterns_col', 'LogReduce','output_parameters_col', '',
'model', model_str, 'anomaly_str', anomaly_str, 'output_type', 'summary');
let code = ```if 1:
from log_cluster import log_reduce_predict
result = log_reduce_predict.log_reduce_predict(df, kargs)
```;
tbl
| evaluate hint.distribution=per_node python(typeof(Count:int, LogReduce:string, example:string), code, kwargs)
};
// Write your query to use the function here.
Example
The following example uses the invoke operator to run the function.
To use a query-defined function, invoke it after the embedded function definition.
let log_reduce_predict_fl=(tbl:(*), models_tbl: (name:string, timestamp: datetime, model:string),
model_name:string, reduce_col:string, anomaly_str: string = 'ANOMALY')
{
let model_str = toscalar(models_tbl | where name == model_name | top 1 by timestamp desc | project model);
let kwargs = bag_pack('logs_col', reduce_col, 'output_patterns_col', 'LogReduce','output_parameters_col', '',
'model', model_str, 'anomaly_str', anomaly_str, 'output_type', 'summary');
let code = ```if 1:
from log_cluster import log_reduce_predict
result = log_reduce_predict.log_reduce_predict(df, kargs)
```;
tbl
| evaluate hint.distribution=per_node python(typeof(Count:int, LogReduce:string, example:string), code, kwargs)
};
HDFS_log_100k
| take 1000
| invoke log_reduce_predict_fl(models_tbl=ML_Models, model_name="HDFS_100K", reduce_col="data")
Output
Count | LogReduce | example |
---|---|---|
239 | 081110 | <NUM> <NUM> INFO dfs.DataNode$DataXceiver: Receiving block blk_<NUM> src: <IP> dest: <IP> 081110 215858 15494 INFO dfs.DataNode$DataXceiver: Receiving block blk_-7037346755429293022 src: /10.251.43.21:45933 dest: /10.251.43.21:50010 |
231 | 081110 | <NUM> <NUM> INFO dfs.DataNode$PacketResponder: Received block blk_<NUM> of size <NUM> from <IP> 081110 215858 15485 INFO dfs.DataNode$PacketResponder: Received block blk_5080254298708411681 of size 67108864 from /10.251.43.21 |
230 | 081110 | <NUM> <NUM> INFO dfs.DataNode$PacketResponder: PacketResponder <NUM> for block blk_<NUM> terminating 081110 215858 15496 INFO dfs.DataNode$PacketResponder: PacketResponder 2 for block blk_-7746692545918257727 terminating |
218 | 081110 | <NUM> <NUM> INFO dfs.FSNamesystem: BLOCK* NameSystem.addStoredBlock: blockMap updated: <IP> is added to blk_<NUM> size <NUM> 081110 215858 27 INFO dfs.FSNamesystem: BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.250.11.85:50010 is added to blk_5080254298708411681 size 67108864 |
79 | 081110 | <NUM> <NUM> INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: <>. <> 081110 215858 26 INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /user/root/rand3/_temporary/task_200811101024_0005_m_001805_0/part-01805. blk-7037346755429293022 |
3 | 081110 | <NUM> <NUM> INFO dfs.DataBlockScanner: Verification succeeded for <*> 081110 215859 13 INFO dfs.DataBlockScanner: Verification succeeded for blk_-7244926816084627474 |