多变量异常情况检测

有关实时智能中的多变量异常情况检测的一般信息,请参阅 Microsoft Fabric 中的多变量异常情况检测 - 概述。 在本教程中,你将使用示例数据在 Python 笔记本中使用 Spark 引擎训练多变量异常情况检测模型。 然后,你将通过使用 Eventhouse 引擎将训练的模型应用到新数据来预测异常情况。 前几个步骤设置环境,以下步骤训练模型并预测异常情况。

先决条件

  • 具有已启用 Microsoft Fabric 的容量工作区
  • 工作区中的“管理员”、“参与者”或“成员”角色。 创建环境等项目需要此权限级别。
  • 具有数据库的工作区中的 eventhouse
  • 从 GitHub 存储库下载示例数据
  • 从 GitHub 存储库下载笔记本

第 1 部分 - 启用 OneLake 可用性

必须先启用 OneLake 可用性,然后才能在 Eventhouse 中获取数据。 此步骤非常重要,因为它使引入的数据能够在 OneLake 中可用。 在后面的步骤中,你可以从 Spark Notebook 访问此相同数据来训练模型。

  1. 在实时智能中浏览到工作区主页。

  2. 选择在先决条件部分中创建的 Eventhouse。 选择要在其中存储数据的数据库。

  3. 在“数据库详细信息”磁贴,选择“OneLake 可用性”旁边的“铅笔”图标

  4. 在右窗格中,将按钮切换为“活动”

  5. 选择“完成” 。

    在 Eventhouse 中启用 OneLake 可用性的屏幕截图。

第 2 部分 - 启用 KQL Python 插件

在此步骤中,你将在 Eventhouse 中启用 python 插件。 此步骤需要在 KQL 查询集中运行预测异常 Python 代码。 请务必选择包含时序异常情况检测器包的正确包。

  1. 在 Eventhouse 屏幕中,选择数据库,然后从功能区中选择“管理”>“插件”

  2. 在“插件”窗格中,将 Python 语言扩展切换到“打开”

  3. 选择 Python 3.11.7 DL(预览版)

  4. 选择“完成” 。

    有关如何在 Eventhouse 中启用 python 包 3.11.7 DL 的屏幕截图。

第 3 部分 - 创建 Spark 环境

在此步骤中,你将创建一个 Spark 环境来运行 Python 笔记本,该笔记本使用 Spark 引擎训练多变量异常情况检测模型。 有关创建环境的详细信息,请参阅创建和管理环境

  1. 在体验切换器中,选择“数据工程”。 如果已处于数据工程师体验中,请浏览到“主页”

  2. 从“要创建的建议项目”中、选择**“环境”,然后输入环境的名称 MVAD_ENV

    在数据工程师中创建环境的屏幕截图。

  3. 在“库”下,选择“公共库”

  4. 选择“从 PyPI 添加”

  5. 在搜索框中,输入“时序异常情况检测器”。 版本会自动填充最新版本。 本教程是使用版本 0.2.7 创建的,这是 Kusto Python 3.11.7 DL 中包含的版本。

  6. 选择“保存”。

    将 PyPI 包添加到 Spark 环境的屏幕截图。

  7. 选择环境中的“主页”选项卡。

  8. 从功能区中,选择“发布”图标。

  9. 选择“全部发布”。 此步骤可能需要几分钟才能完成。

    发布环境的屏幕截图。

第 4 部分- 将数据获取到 Eventhouse

  1. 将鼠标悬停在要在其中存储数据的 KQL 数据库上。 选择“更多菜单 [...]”>“获取数据”>“本地文件”

    从本地文件获取数据的屏幕截图。

  2. 选择“+ 新建表”,然后输入 demo_stocks_change 作为表名。

  3. 在“上传数据”对话框中,选择“浏览文件”并上传“先决条件”中下载的示例数据文件

  4. 选择下一步

  5. 在“检查数据”部分中,将“第一行为列标头”切换为“打开”

  6. 选择“完成”。

  7. 上传数据后,选择“关闭”

第 5 部分 - 将 OneLake 路径复制到表

请确保选择 demo_stocks_change 表。 在“表详细信息”磁贴中,选择“复制路径”,以将 OneLake 路径复制到剪贴板。 将此复制的文本保存在文本编辑器的某个位置,以供后续步骤使用。

复制 OneLake 路径的屏幕截图。

第 6 部分 - 准备笔记本

  1. 在体验切换器中,选择“数据工程”。

  2. 在“要创建的建议的项目”下,选择“导入笔记本”。 可能需要向右滚动才能找到此选项。

  3. 选择“上传”,然后选择在“先决条件”中下载的笔记本。

  4. 上传笔记本后,浏览到工作区并打开笔记本。

  5. 在顶部功能区中,选择“工作区默认”下拉列表,然后选择在上一步中创建的环境。

    在笔记本中选择环境的屏幕截图。

第 7 部分 - 运行笔记本

  1. 导入标准包。

    import numpy as np
    import pandas as pd
    
  2. Spark 需要 ABFSS URI 才能安全地连接到 OneLake 存储,因此下一步将定义此函数以将 OneLake URI 转换为 ABFSS URI。

    def convert_onelake_to_abfss(onelake_uri):
    if not onelake_uri.startswith('https://'):
        raise ValueError("Invalid OneLake URI. It should start with 'https://'.")
    uri_without_scheme = onelake_uri[8:]
    parts = uri_without_scheme.split('/')
    if len(parts) < 3:
        raise ValueError("Invalid OneLake URI format.")
    account_name = parts[0].split('.')[0]
    container_name = parts[1]
    path = '/'.join(parts[2:])
    abfss_uri = f"abfss://{container_name}@{parts[0]}/{path}"
    return abfss_uri
    
  3. 输入从第 5 部分 - 将 OneLake 路径复制到表中复制的 OneLake URI,以便将 demo_stocks_change 表加载到 pandas 数据帧中。

    onelake_uri = "OneLakeTableURI" # Replace with your OneLake table URI 
    abfss_uri = convert_onelake_to_abfss(onelake_uri)
    print(abfss_uri)
    
    df = spark.read.format('delta').load(abfss_uri)
    df = df.toPandas().set_index('Date')
    print(df.shape)
    df[:3]
    
  4. 运行以下单元格来准备训练和预测数据帧。

    注意

    实际预测将由 Eventhouse 在第 9 部分 - Predict-anomalies-in-the-kql-queryset 中的数据上运行。 在生产应用场景中,如果要将数据流式传输到 Eventhouse,则会对新的流数据进行预测。 在本教程中,数据集已按日期拆分为两个部分以进行训练和预测。 这样做是为了模拟历史数据和新的流数据。

    features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY']
    cutoff_date = pd.to_datetime('2023-01-01')
    
    train_df = df[df.Date < cutoff_date]
    print(train_df.shape)
    train_df[:3]
    
    train_len = len(train_df)
    predict_len = len(df) - train_len
    print(f'Total samples: {len(df)}. Split to {train_len} for training, {predict_len} for testing')
    
  5. 运行单元格以训练模型并将其保存在 Fabric MLflow 模型注册表中。

    import mlflow
    from anomaly_detector import MultivariateAnomalyDetector
    model = MultivariateAnomalyDetector()
    
    sliding_window = 200
    param   s = {"sliding_window": sliding_window}
    
    model.fit(train_df, params=params)
    
    with mlflow.start_run():
        mlflow.log_params(params)
        mlflow.set_tag("Training Info", "MVAD on 5 Stocks Dataset")
    
        model_info = mlflow.pyfunc.log_model(
            python_model=model,
            artifact_path="mvad_artifacts",
            registered_model_name="mvad_5_stocks_model",
        )
    
    # Extract the registered model path to be used for prediction using Kusto Python sandbox
    
    mi = mlflow.search_registered_models(filter_string="name='mvad_5_stocks_model'")[0]
    model_abfss = mi.latest_versions[0].source
    print(model_abfss)
    
  6. 从最后一个单元格输出复制模型 URI。 在后面的下一步骤中将要使用此 URI。

第 8 部分 - 设置 KQL 查询集

有关一般信息,请参阅创建 KQL 查询集

  1. 在体验切换器中,选择“实时智能”
  2. 选择工作区。
  3. 选择“+新建项目”>“KQL 查询集”。 输入名称 MultivariateAnomalyDetectionTutorial
  4. 选择创建
  5. 在“OneLake 数据中心”窗口中,选择存储数据的 KQL 数据库。
  6. 选择“连接” 。

第 9 部分 - 预测 KQL 查询集中的异常情况

  1. 复制/粘贴并运行以下“.create-or-alter 函数”查询以定义 predict_fabric_mvad_fl() 存储的函数:

    .create-or-alter function with (folder = "Packages\\ML", docstring = "Predict MVAD model in Microsoft Fabric")
    predict_fabric_mvad_fl(samples:(*), features_cols:dynamic, artifacts_uri:string, trim_result:bool=false)
    {
        let s = artifacts_uri;
        let artifacts = bag_pack('MLmodel', strcat(s, '/MLmodel;impersonate'), 'conda.yaml', strcat(s, '/conda.yaml;impersonate'),
                                 'requirements.txt', strcat(s, '/requirements.txt;impersonate'), 'python_env.yaml', strcat(s, '/python_env.yaml;impersonate'),
                                 'python_model.pkl', strcat(s, '/python_model.pkl;impersonate'));
        let kwargs = bag_pack('features_cols', features_cols, 'trim_result', trim_result);
        let code = ```if 1:
            import os
            import shutil
            import mlflow
            model_dir = 'C:/Temp/mvad_model'
            model_data_dir = model_dir + '/data'
            os.mkdir(model_dir)
            shutil.move('C:/Temp/MLmodel', model_dir)
            shutil.move('C:/Temp/conda.yaml', model_dir)
            shutil.move('C:/Temp/requirements.txt', model_dir)
            shutil.move('C:/Temp/python_env.yaml', model_dir)
            shutil.move('C:/Temp/python_model.pkl', model_dir)
            features_cols = kargs["features_cols"]
            trim_result = kargs["trim_result"]
            test_data = df[features_cols]
            model = mlflow.pyfunc.load_model(model_dir)
            predictions = model.predict(test_data)
            predict_result = pd.DataFrame(predictions)
            samples_offset = len(df) - len(predict_result)        # this model doesn't output predictions for the first sliding_window-1 samples
            if trim_result:                                       # trim the prefix samples
                result = df[samples_offset:]
                result.iloc[:,-4:] = predict_result.iloc[:, 1:]   # no need to copy 1st column which is the timestamp index
            else:
                result = df                                       # output all samples
                result.iloc[samples_offset:,-4:] = predict_result.iloc[:, 1:]
            ```;
        samples
        | evaluate python(typeof(*), code, kwargs, external_artifacts=artifacts)
    }
    
  2. 复制/粘贴以下预测查询。

    1. 替换步骤 7 末尾复制的输出模型 URI。
    2. 运行查询。 它将根据训练的模型检测五个股票的多变量异常情况,并将结果呈现为 anomalychart。 异常点呈现在第一个股票上 (AAPL),尽管它们表示多变量异常情况(换句话说,五只股票在特定日期的联合变化的异常)。
    let cutoff_date=datetime(2023-01-01);
    let num_predictions=toscalar(demo_stocks_change | where Date >= cutoff_date | count);   //  number of latest points to predict
    let sliding_window=200;                                                                 //  should match the window that was set for model training
    let prefix_score_len = sliding_window/2+min_of(sliding_window/2, 200)-1;
    let num_samples = prefix_score_len + num_predictions;
    demo_stocks_change
    | top num_samples by Date desc 
    | order by Date asc
    | extend is_anomaly=bool(false), score=real(null), severity=real(null), interpretation=dynamic(null)
    | invoke predict_fabric_mvad_fl(pack_array('AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'),
                // NOTE: Update artifacts_uri to model path
                artifacts_uri='enter your model URI here',
                trim_result=true)
    | summarize Date=make_list(Date), AAPL=make_list(AAPL), AMZN=make_list(AMZN), GOOG=make_list(GOOG), MSFT=make_list(MSFT), SPY=make_list(SPY), anomaly=make_list(toint(is_anomaly))
    | render anomalychart with(anomalycolumns=anomaly, title='Stock Price Changest in % with Anomalies')
    

产生的异常情况图应如下图所示:

多变量异常情况输出的屏幕截图。

清理资源

完成本教程后,你可以删除你创建的资源,从而避免产生其他成本。 若要删除资源,请执行以下步骤:

  1. 浏览到工作区主页。
  2. 删除在本教程中创建的环境。
  3. 删除在本教程中创建的笔记本。
  4. 删除在本教程中使用的 Eventhouse 或数据库
  5. 删除在本教程中创建的 KQL 查询集。