配方:Azure AI 服務 - 多重變數異常偵測
此配方說明如何在 Apache Spark 上使用 SynapseML 和 Azure AI 服務來進行多重異常偵測。 多重變數異常偵測允許偵測許多變數或時間序列之間的異常,並考慮不同變數之間的所有相互關聯和相依性。 在此案例中,我們使用 SynapseML 來使用 Azure AI 服務來定型多變數異常偵測的模型,然後使用 模型來推斷數據集內包含三個 IoT 感測器綜合測量的多重變數異常。
重要
從 2023 年 9 月 20 日開始,您將無法建立新的 異常偵測程式 資源。 異常偵測程式 服務將於 2026 年 10 月 1 日淘汰。
若要深入瞭解 Azure AI 異常偵測程式,請參閱本文件頁面。
必要條件
- Azure 訂用帳戶 - 免費建立一個訂用帳戶
- 將筆記本附加至 Lakehouse。 在左側,選取 [新增 ] 以新增現有的 Lakehouse 或建立 Lakehouse。
設定
請遵循指示,Anomaly Detector
使用 Azure 入口網站 建立資源,或者您也可以使用 Azure CLI 來建立此資源。
設定 Anomaly Detector
之後,您可以探索處理各種表單資料的方法。 Azure AI 內的服務類別目錄提供數個選項:視覺、語音、語言、Web 搜尋、決策、翻譯和文件智慧。
建立異常偵測器資源
- 在 Azure 入口網站 中,選取資源群組中的 [建立],然後輸入 異常偵測程式。 選取 異常偵測程式 資源。
- 為資源指定名稱,在理想情況下,請使用與資源群組的其餘區域相同的區域。 使用其餘項目的預設選項,然後選取 [檢閱 + 建立],然後選取 [建立]。
- 建立 異常偵測程式 資源之後,請開啟資源,然後選取
Keys and Endpoints
左側導覽中的面板。 將 異常偵測程式 資源的ANOMALY_API_KEY
金鑰複製到環境變數中,或將它儲存在變數中anomalyKey
。
建立 儲存體 帳戶資源
若要儲存中繼數據,您必須建立 Azure Blob 儲存體 帳戶。 在該記憶體帳戶內,建立用來儲存中繼數據的容器。 記下容器名稱,並將 連接字串 複製到該容器。 您稍後需要它來填入 containerName
變數和 BLOB_CONNECTION_STRING
環境變數。
輸入您的服務金鑰
讓我們從設定服務金鑰的環境變數開始。 下一個數據格會根據 Azure 金鑰保存庫 中所儲存的值來設定 ANOMALY_API_KEY
和 BLOB_CONNECTION_STRING
環境變數。 如果您在自己的環境中執行本教學課程,請務必先設定這些環境變數,再繼續進行。
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
現在,讓我們讀取 ANOMALY_API_KEY
和 BLOB_CONNECTION_STRING
環境變數,並設定 containerName
和 location
變數。
# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
"wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"
首先,我們會連線到記憶體帳戶,讓異常偵測器可以儲存該處的中繼結果:
spark.sparkContext._jsc.hadoopConfiguration().set(
f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)
讓我們匯入所有必要的模組。
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import synapse.ml
from synapse.ml.cognitive import *
現在,讓我們將範例數據讀入 Spark DataFrame。
df = (
spark.read.format("csv")
.option("header", "true")
.load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)
df = (
df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)
# Let's inspect the dataframe:
df.show(5)
我們現在可以建立 estimator
物件,用來定型模型。 我們會指定定型數據的開始和結束時間。 我們也指定要使用的輸入數據行,以及包含時間戳的數據行名稱。 最後,我們會指定要在異常偵測滑動視窗中使用的數據點數目,並將 連接字串 設定為 Azure Blob 儲存體 帳戶。
trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]
estimator = (
FitMultivariateAnomaly()
.setSubscriptionKey(anomalyKey)
.setLocation(location)
.setStartTime(trainingStartTime)
.setEndTime(trainingEndTime)
.setIntermediateSaveDir(intermediateSaveDir)
.setTimestampCol(timestampColumn)
.setInputCols(inputColumns)
.setSlidingWindow(200)
)
現在我們已建立 estimator
,讓我們將其符合數據:
model = estimator.fit(df)
```parameter
Once the training is done, we can now use the model for inference. The code in the next cell specifies the start and end times for the data we would like to detect the anomalies in.
```python
inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"
result = (
model.setStartTime(inferenceStartTime)
.setEndTime(inferenceEndTime)
.setOutputCol("results")
.setErrorCol("errors")
.setInputCols(inputColumns)
.setTimestampCol(timestampColumn)
.transform(df)
)
result.show(5)
當我們在上一個儲存格中呼叫 .show(5)
時,它會顯示數據框架中的前五個數據列。 結果全 null
都是因為它們不在推斷視窗內。
若要只顯示推斷數據的結果,請選取我們需要的數據行。 然後,我們可以依遞增順序排序數據框架中的數據列,並篩選結果,只顯示推斷視窗範圍內的數據列。 在我們的案例 inferenceEndTime
中,與數據框架中的最後一個數據列相同,因此可以忽略這一點。
最後,為了能夠更妥善地繪製結果,可讓Spark數據框架轉換成 Pandas 數據框架。
rdf = (
result.select(
"timestamp",
*inputColumns,
"results.contributors",
"results.isAnomaly",
"results.severity"
)
.orderBy("timestamp", ascending=True)
.filter(col("timestamp") >= lit(inferenceStartTime))
.toPandas()
)
rdf
將 contributors
每個感測器的貢獻分數儲存到偵測到異常的數據行格式。 下一個數據格會格式化此數據,並將每個感測器的貢獻分數分割成自己的數據行。
def parse(x):
if type(x) is list:
return dict([item[::-1] for item in x])
else:
return {"series_0": 0, "series_1": 0, "series_2": 0}
rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
[rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf
太棒了! 我們現在在、 series_1
和 series_2
數據行中series_0
分別具有感測器 1、2 和 3 的貢獻分數。
執行下一個儲存格來繪製結果。 參數 minSeverity
會指定要繪製之異常的最小嚴重性。
minSeverity = 0.1
####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
rdf["timestamp"],
rdf["sensor_1"],
color="tab:orange",
linestyle="solid",
linewidth=2,
label="sensor_1",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_2"],
color="tab:green",
linestyle="solid",
linewidth=2,
label="sensor_2",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_3"],
color="tab:blue",
linestyle="solid",
linewidth=2,
label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()
anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)
plt.legend()
plt.title(
"A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()
####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
rdf["timestamp"],
rdf["severity"],
color="black",
linestyle="solid",
linewidth=2,
label="Severity score",
)
plt.plot(
rdf["timestamp"],
[minSeverity] * len(rdf["severity"]),
color="red",
linestyle="dotted",
linewidth=1,
label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()
####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
rdf["timestamp"],
rdf["series_1"],
width=2,
color="tab:green",
label="sensor_2",
bottom=rdf["series_0"],
)
plt.bar(
rdf["timestamp"],
rdf["series_2"],
width=2,
color="tab:blue",
label="sensor_3",
bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()
這些繪圖會以橙色、綠色和藍色顯示感測器(在推斷視窗內)的原始數據。 第一個圖中的紅色垂直線顯示偵測到的異常,其嚴重性大於或等於 minSeverity
。
第二個繪圖顯示所有偵測到異常的嚴重性分數,且 minSeverity
閾值會顯示在虛線紅色線條中。
最後,最後一個繪圖會顯示每個感測器的數據對偵測到異常的貢獻。 它可協助我們診斷和瞭解每個異常最有可能的原因。
相關內容
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應