配方:Azure AI 服務 - 多變量異常偵測器
此配方說明如何搭配使用 SynapseML 和 Apache Spark 上的 Azure AI 服務,進行多變量異常偵測。 多變量異常偵測允許偵測許多變數或時間序列之間的異常狀況,並考慮不同變數之間的所有相互關聯和相依性。 在此案例中,我們使用 SynapseML 來訓練模型,其中該模型會使用 Azure AI 服務來進行多變量異常偵測;然後,我們會使用該模型來推斷資料集內包含三個 IoT 感應器綜合測量的多變量異常狀況。
重要
從 2023 年 9 月 20 日起,您將無法建立新的異常偵測器資源。 異常偵測器服務將於 2026 年 10 月 1 日淘汰。
如需深入了解 Azure AI 異常偵測程式,請參閱本文件頁面文件。
必要條件
- Azure 訂用帳戶 - 建立免費帳戶
- 將筆記本連結至 Lakehouse。 在左側,選取 [新增],以新增現有的 Lakehouse 或建立 Lakehouse。
設定
請遵循指示,使用 Azure 入口網站建立 Anomaly Detector
資源,或者您也可以使用 Azure CLI 來建立此資源。
設定 Anomaly Detector
之後,您可以探索處理各種表單資料的方法。 Azure AI 內的服務目錄可提供數個選項:視覺、語音、語言、Web 搜尋、決策、翻譯和文件智慧。
建立異常偵測器資源
- 在 Azure 入口網站中,選取資源群組中的 [建立],然後輸入 [異常偵測程式]。 選取異常偵測程式資源。
- 為資源指定名稱,並且在理想情況下,請使用與其餘資源群組相同的區域。 其餘部分使用預設選項,然後選取 [檢閱 + 建立],再然後選取 [建立]。
- 建立 [異常偵測程式資源] 之後,將其開啟,然後選取左側瀏覽窗格中的
Keys and Endpoints
面板。 將異常偵測程式資源的金鑰複製到ANOMALY_API_KEY
環境變數中,或將其儲存在anomalyKey
變數中。
建立儲存體帳戶資源
若要儲存中繼資料,您需要建立 Azure Blob 儲存體帳戶。 在該儲存體帳戶內,建立用來儲存中繼資料的容器。 記下容器名稱,並將連接字串複製到該容器。 您稍後需要用來填入 containerName
變數和 BLOB_CONNECTION_STRING
環境變數。
輸入您的服務金鑰
讓我們從設定服務金鑰的環境變數開始。 下一個資料格會根據 Azure Key Vault 中儲存的值來設定 ANOMALY_API_KEY
和 BLOB_CONNECTION_STRING
環境變數。 如果在自己的環境中執行本教學課程,務必先設定這些環境變數,再繼續進行。
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
現在,讓我們讀取 ANOMALY_API_KEY
和 BLOB_CONNECTION_STRING
環境變數,再設定 containerName
和 location
變數。
# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
"wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"
首先,我們會連線到儲存體帳戶,讓異常偵測器可以儲存該處的中繼結果:
spark.sparkContext._jsc.hadoopConfiguration().set(
f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)
讓我們匯入所有必要的模組。
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import synapse.ml
from synapse.ml.cognitive import *
現在,讓我們將範例資料讀入 Spark DataFrame。
df = (
spark.read.format("csv")
.option("header", "true")
.load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)
df = (
df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)
# Let's inspect the dataframe:
df.show(5)
我們現在可以建立用於訓練模型的 estimator
物件。 我們會指定訓練資料的開始和結束時間。 我們也會指定要使用的輸入資料行,以及包含時間戳記的資料行名稱。 最後,我們會指定要在異常偵測滑動視窗中使用的資料點數目,並將連接字串設定為 Azure Blob 儲存體帳戶。
trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]
estimator = (
FitMultivariateAnomaly()
.setSubscriptionKey(anomalyKey)
.setLocation(location)
.setStartTime(trainingStartTime)
.setEndTime(trainingEndTime)
.setIntermediateSaveDir(intermediateSaveDir)
.setTimestampCol(timestampColumn)
.setInputCols(inputColumns)
.setSlidingWindow(200)
)
現在我們建立了 estimator
,讓我們將其擬合到資料中:
model = estimator.fit(df)
```parameter
Once the training is done, we can now use the model for inference. The code in the next cell specifies the start and end times for the data we would like to detect the anomalies in.
```python
inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"
result = (
model.setStartTime(inferenceStartTime)
.setEndTime(inferenceEndTime)
.setOutputCol("results")
.setErrorCol("errors")
.setInputCols(inputColumns)
.setTimestampCol(timestampColumn)
.transform(df)
)
result.show(5)
當我們在上一個資料格中呼叫 .show(5)
時,它會顯示 DataFrame 中的前五個資料列。 結果全都是 null
,因為它們不在推斷視窗內。
若要只顯示推斷資料的結果,請選取我們需要的資料行。 然後,我們可以依遞增排序 DataFrame 中的資料列,並篩選結果,只顯示推斷視窗範圍內的資料列。 在我們的案例 inferenceEndTime
中,與 DataFrame 中的最後一個資料列相同,因此可以忽略這一點。
最後,為了能夠更妥善地繪製結果,可讓 Spark DataFrame 轉換成 Pandas DataFrame。
rdf = (
result.select(
"timestamp",
*inputColumns,
"results.contributors",
"results.isAnomaly",
"results.severity"
)
.orderBy("timestamp", ascending=True)
.filter(col("timestamp") >= lit(inferenceStartTime))
.toPandas()
)
rdf
格式化 contributors
資料行,以將貢獻分數從每個感應器儲存到偵測到的異常。 下一個資料格會格式化此資料,並將每個感應器的貢獻分數分割成自己的資料行。
def parse(x):
if type(x) is list:
return dict([item[::-1] for item in x])
else:
return {"series_0": 0, "series_1": 0, "series_2": 0}
rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
[rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf
太棒了! 我們現在在 series_0
、series_1
和 series_2
資料行中分別具有感測器 1、2 和 3 的貢獻分數。
執行下一個資料格以繪製結果。 minSeverity
參數會指定要繪製之異常的最小嚴重性。
minSeverity = 0.1
####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
rdf["timestamp"],
rdf["sensor_1"],
color="tab:orange",
linestyle="solid",
linewidth=2,
label="sensor_1",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_2"],
color="tab:green",
linestyle="solid",
linewidth=2,
label="sensor_2",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_3"],
color="tab:blue",
linestyle="solid",
linewidth=2,
label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()
anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)
plt.legend()
plt.title(
"A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()
####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
rdf["timestamp"],
rdf["severity"],
color="black",
linestyle="solid",
linewidth=2,
label="Severity score",
)
plt.plot(
rdf["timestamp"],
[minSeverity] * len(rdf["severity"]),
color="red",
linestyle="dotted",
linewidth=1,
label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()
####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
rdf["timestamp"],
rdf["series_1"],
width=2,
color="tab:green",
label="sensor_2",
bottom=rdf["series_0"],
)
plt.bar(
rdf["timestamp"],
rdf["series_2"],
width=2,
color="tab:blue",
label="sensor_3",
bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()
這些繪圖會以橙色、綠色和藍色顯示感測器 (在推斷視窗內) 的未經處理資料。 第一個圖中的紅色垂直線顯示所偵測到的異常,其嚴重性大於或等於 minSeverity
。
第二個繪圖顯示了所有偵測到異常的嚴重性分數,且 minSeverity
閾值會顯示在虛線紅色線條中。
最後,最後一個繪圖會顯示每個感測器的資料對偵測到異常的貢獻。 它可協助我們診斷和了解每個異常最有可能的原因。