共用方式為


配方:Azure AI 服務 - 多重變數異常偵測

此配方說明如何在 Apache Spark 上使用 SynapseML 和 Azure AI 服務來進行多重異常偵測。 多重變數異常偵測允許偵測許多變數或時間序列之間的異常,並考慮不同變數之間的所有相互關聯和相依性。 在此案例中,我們使用 SynapseML 來使用 Azure AI 服務來定型多變數異常偵測的模型,然後使用 模型來推斷數據集內包含三個 IoT 感測器綜合測量的多重變數異常。

重要

從 2023 年 9 月 20 日開始,您將無法建立新的 異常偵測程式 資源。 異常偵測程式 服務將於 2026 年 10 月 1 日淘汰。

若要深入瞭解 Azure AI 異常偵測程式,請參閱本文件頁面

必要條件

  • Azure 訂用帳戶 - 免費建立一個訂用帳戶
  • 將筆記本附加至 Lakehouse。 在左側,選取 [新增 ] 以新增現有的 Lakehouse 或建立 Lakehouse。

設定

請遵循指示,Anomaly Detector使用 Azure 入口網站 建立資源,或者您也可以使用 Azure CLI 來建立此資源。

設定 Anomaly Detector之後,您可以探索處理各種表單資料的方法。 Azure AI 內的服務類別目錄提供數個選項:視覺、語音語言Web 搜尋決策翻譯文件智慧

建立異常偵測器資源

  • 在 Azure 入口網站 中,選取資源群組中的 [建立],然後輸入 異常偵測程式。 選取 異常偵測程式 資源。
  • 為資源指定名稱,在理想情況下,請使用與資源群組的其餘區域相同的區域。 使用其餘項目的預設選項,然後選取 [檢閱 + 建立],然後選取 [建立]。
  • 建立 異常偵測程式 資源之後,請開啟資源,然後選取Keys and Endpoints左側導覽中的面板。 將 異常偵測程式 資源的ANOMALY_API_KEY金鑰複製到環境變數中,或將它儲存在變數中anomalyKey

建立 儲存體 帳戶資源

若要儲存中繼數據,您必須建立 Azure Blob 儲存體 帳戶。 在該記憶體帳戶內,建立用來儲存中繼數據的容器。 記下容器名稱,並將 連接字串 複製到該容器。 您稍後需要它來填入 containerName 變數和 BLOB_CONNECTION_STRING 環境變數。

輸入您的服務金鑰

讓我們從設定服務金鑰的環境變數開始。 下一個數據格會根據 Azure 金鑰保存庫 中所儲存的值來設定 ANOMALY_API_KEYBLOB_CONNECTION_STRING 環境變數。 如果您在自己的環境中執行本教學課程,請務必先設定這些環境變數,再繼續進行。

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

現在,讓我們讀取 ANOMALY_API_KEYBLOB_CONNECTION_STRING 環境變數,並設定 containerNamelocation 變數。

# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
    "wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"

首先,我們會連線到記憶體帳戶,讓異常偵測器可以儲存該處的中繼結果:

spark.sparkContext._jsc.hadoopConfiguration().set(
    f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)

讓我們匯入所有必要的模組。

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt

import synapse.ml
from synapse.ml.cognitive import *

現在,讓我們將範例數據讀入 Spark DataFrame。

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)

df = (
    df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)

# Let's inspect the dataframe:
df.show(5)

我們現在可以建立 estimator 物件,用來定型模型。 我們會指定定型數據的開始和結束時間。 我們也指定要使用的輸入數據行,以及包含時間戳的數據行名稱。 最後,我們會指定要在異常偵測滑動視窗中使用的數據點數目,並將 連接字串 設定為 Azure Blob 儲存體 帳戶。

trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]

estimator = (
    FitMultivariateAnomaly()
    .setSubscriptionKey(anomalyKey)
    .setLocation(location)
    .setStartTime(trainingStartTime)
    .setEndTime(trainingEndTime)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
)

現在我們已建立 estimator,讓我們將其符合數據:

model = estimator.fit(df)
```parameter

Once the training is done, we can now use the model for inference. The code in the next cell specifies the start and end times for the data we would like to detect the anomalies in.

```python
inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"

result = (
    model.setStartTime(inferenceStartTime)
    .setEndTime(inferenceEndTime)
    .setOutputCol("results")
    .setErrorCol("errors")
    .setInputCols(inputColumns)
    .setTimestampCol(timestampColumn)
    .transform(df)
)

result.show(5)

當我們在上一個儲存格中呼叫 .show(5) 時,它會顯示數據框架中的前五個數據列。 結果全 null 都是因為它們不在推斷視窗內。

若要只顯示推斷數據的結果,請選取我們需要的數據行。 然後,我們可以依遞增順序排序數據框架中的數據列,並篩選結果,只顯示推斷視窗範圍內的數據列。 在我們的案例 inferenceEndTime 中,與數據框架中的最後一個數據列相同,因此可以忽略這一點。

最後,為了能夠更妥善地繪製結果,可讓Spark數據框架轉換成 Pandas 數據框架。

rdf = (
    result.select(
        "timestamp",
        *inputColumns,
        "results.contributors",
        "results.isAnomaly",
        "results.severity"
    )
    .orderBy("timestamp", ascending=True)
    .filter(col("timestamp") >= lit(inferenceStartTime))
    .toPandas()
)

rdf

contributors 每個感測器的貢獻分數儲存到偵測到異常的數據行格式。 下一個數據格會格式化此數據,並將每個感測器的貢獻分數分割成自己的數據行。

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {"series_0": 0, "series_1": 0, "series_2": 0}

rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
    [rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf

太棒了! 我們現在在、 series_1series_2 數據行中series_0分別具有感測器 1、2 和 3 的貢獻分數。

執行下一個儲存格來繪製結果。 參數 minSeverity 會指定要繪製之異常的最小嚴重性。

minSeverity = 0.1

####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
    rdf["timestamp"],
    rdf["sensor_1"],
    color="tab:orange",
    linestyle="solid",
    linewidth=2,
    label="sensor_1",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_2"],
    color="tab:green",
    linestyle="solid",
    linewidth=2,
    label="sensor_2",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_3"],
    color="tab:blue",
    linestyle="solid",
    linewidth=2,
    label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()

anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)

plt.legend()
plt.title(
    "A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()

####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
    rdf["timestamp"],
    rdf["severity"],
    color="black",
    linestyle="solid",
    linewidth=2,
    label="Severity score",
)
plt.plot(
    rdf["timestamp"],
    [minSeverity] * len(rdf["severity"]),
    color="red",
    linestyle="dotted",
    linewidth=1,
    label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()

####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
    rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
    rdf["timestamp"],
    rdf["series_1"],
    width=2,
    color="tab:green",
    label="sensor_2",
    bottom=rdf["series_0"],
)
plt.bar(
    rdf["timestamp"],
    rdf["series_2"],
    width=2,
    color="tab:blue",
    label="sensor_3",
    bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()

Screenshot of multivariate anomaly detection results plot

這些繪圖會以橙色、綠色和藍色顯示感測器(在推斷視窗內)的原始數據。 第一個圖中的紅色垂直線顯示偵測到的異常,其嚴重性大於或等於 minSeverity

第二個繪圖顯示所有偵測到異常的嚴重性分數,且 minSeverity 閾值會顯示在虛線紅色線條中。

最後,最後一個繪圖會顯示每個感測器的數據對偵測到異常的貢獻。 它可協助我們診斷和瞭解每個異常最有可能的原因。