Recept: Cognitive Services – Többváltozós anomáliadetektálás
Ez a recept bemutatja, hogyan használhatja a SynapseML-t és az Azure Cognitive Servicest az Apache Sparkon a többváltozós anomáliadetektáláshoz. A többváltozós anomáliadetektálás lehetővé teszi a rendellenességek észlelését számos változó vagy időzón között, figyelembe véve a különböző változók közötti összes korrelációt és függőséget. Ebben a forgatókönyvben a SynapseML használatával tanítunk be egy modellt a többváltozós anomáliadetektáláshoz az Azure Cognitive Services használatával, majd a modell használatával többváltozós anomáliákat következtetünk egy három IoT-érzékelőből származó szintetikus méréseket tartalmazó adathalmazon belül.
A anomáliadetektor Cognitive Service-ről további információt ebben a dokumentációs oldalon talál.
Előfeltételek
- Azure-előfizetés – Ingyenes létrehozása
- Csatolja a jegyzetfüzetet egy tótárházhoz. A bal oldalon válassza a Hozzáadás lehetőséget egy meglévő tóház hozzáadásához vagy egy tóház létrehozásához.
Telepítés
Anomáliadetektor-erőforrás létrehozása
Kövesse az utasításokat egy Anomaly Detector
erőforrás létrehozásához a Azure Portal vagy másik lehetőségként az Azure CLI-vel is létrehozhatja ezt az erőforrást.
- A Azure Portal kattintson
Create
az erőforráscsoportba, majd írja be a következőtAnomaly Detector
: . Kattintson a anomáliadetektor erőforrásra. - Nevezze el az erőforrást, és ideális esetben ugyanazt a régiót használja, mint az erőforráscsoport többi része. Használja az alapértelmezett beállításokat a többihez, majd kattintson
Review + Create
a gombra, majd a parancsraCreate
. - A anomáliadetektor erőforrás létrehozása után nyissa meg, és kattintson a
Keys and Endpoints
bal oldali panelre. Másolja a anomáliadetektor erőforrás kulcsát aANOMALY_API_KEY
környezeti változóba, vagy tárolja aanomalyKey
változóban.
Tárfiók-erőforrás létrehozása
A köztes adatok mentéséhez létre kell hoznia egy Azure Blob Storage-fiókot. Ebben a tárfiókban hozzon létre egy tárolót a köztes adatok tárolásához. Jegyezze fel a tároló nevét, és másolja a kapcsolati sztring a tárolóba. Később szüksége lesz rá a változó és a containerName
környezeti változó feltöltéséhez BLOB_CONNECTION_STRING
.
Adja meg a szolgáltatáskulcsokat
Kezdjük a szolgáltatáskulcsok környezeti változóinak beállításával. A következő cella az ANOMALY_API_KEY
Azure Key Vault tárolt értékek alapján állítja be a és a BLOB_CONNECTION_STRING
környezeti változót. Ha ezt az oktatóanyagot a saját környezetében futtatja, a folytatás előtt mindenképpen állítsa be ezeket a környezeti változókat.
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Most olvassa be a és BLOB_CONNECTION_STRING
a ANOMALY_API_KEY
környezeti változókat, és állítsa be a és location
a containerName
változókat.
# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
"wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"
Először csatlakozunk a tárfiókunkhoz, hogy az anomáliadetektor menthesse a köztes eredményeket:
spark.sparkContext._jsc.hadoopConfiguration().set(
f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)
Importáljuk az összes szükséges modult.
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import synapse.ml
from synapse.ml.cognitive import *
Most olvassuk be a mintaadatokat egy Spark DataFrame-be.
df = (
spark.read.format("csv")
.option("header", "true")
.load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)
df = (
df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)
# Let's inspect the dataframe:
df.show(5)
Most már létrehozhatunk egy estimator
objektumot, amelyet a modell betanításakor használunk. A betanítási adatok kezdési és befejezési időpontjai meg lesznek adva. A használni kívánt bemeneti oszlopokat és az időbélyegeket tartalmazó oszlop nevét is meg kell adni. Végül megszabjuk az anomáliadetektálási csúszóablakban használandó adatpontok számát, és a kapcsolati sztring a Azure Blob Storage Fiók értékre állítjuk.
trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]
estimator = (
FitMultivariateAnomaly()
.setSubscriptionKey(anomalyKey)
.setLocation(location)
.setStartTime(trainingStartTime)
.setEndTime(trainingEndTime)
.setIntermediateSaveDir(intermediateSaveDir)
.setTimestampCol(timestampColumn)
.setInputCols(inputColumns)
.setSlidingWindow(200)
)
Most, hogy létrehoztuk a -t estimator
, illesztsük be az adatokhoz:
model = estimator.fit(df)
A betanítás befejezése után már használhatjuk a modellt a következtetéshez. A következő cellában lévő kód határozza meg azoknak az adatoknak a kezdési és befejezési idejét, amelyekben észlelni szeretnénk az anomáliákat.
inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"
result = (
model.setStartTime(inferenceStartTime)
.setEndTime(inferenceEndTime)
.setOutputCol("results")
.setErrorCol("errors")
.setInputCols(inputColumns)
.setTimestampCol(timestampColumn)
.transform(df)
)
result.show(5)
Amikor meghívtuk .show(5)
az előző cellában, az az adatkeret első öt sorát mutatta meg. Az eredmények mind null
azért voltak, mert nem voltak a következtetési ablakban.
Ha csak a kikövetkeztetett adatok eredményeit szeretné megjeleníteni, jelölje ki a szükséges oszlopokat. Ezután növekvő sorrendben rendezhetjük az adatkeret sorait, és szűrhetjük az eredményt, hogy csak a következtetési ablak tartományában lévő sorokat jelenítsük meg. Esetünkben inferenceEndTime
ugyanaz, mint az adatkeret utolsó sora, ezért ezt figyelmen kívül hagyhatja.
Végül az eredmények jobb ábrázolása érdekében alakítsa át a Spark-adatkeretet Pandas-adatkeretté.
rdf = (
result.select(
"timestamp",
*inputColumns,
"results.contributors",
"results.isAnomaly",
"results.severity"
)
.orderBy("timestamp", ascending=True)
.filter(col("timestamp") >= lit(inferenceStartTime))
.toPandas()
)
rdf
Most formázza azt az oszlopot, amely az contributors
egyes érzékelők hozzájárulási pontszámát tárolja az észlelt anomáliákhoz. A következő cella formázja ezeket az adatokat, és felosztja az egyes érzékelők hozzájárulási pontszámát a saját oszlopára.
def parse(x):
if type(x) is list:
return dict([item[::-1] for item in x])
else:
return {"series_0": 0, "series_1": 0, "series_2": 0}
rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
[rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf
Nagyszerű! Most már rendelkezünk az 1, 2 és 3 érzékelők hozzájárulási pontszámával a series_0
, series_1
és series_2
oszlopban.
Futtassa a következő cellát az eredmények ábrázolásához. Az minSeverity
első sor paramétere határozza meg a ábrázolni kívánt anomáliák minimális súlyosságát.
minSeverity = 0.1
####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
rdf["timestamp"],
rdf["sensor_1"],
color="tab:orange",
linestyle="solid",
linewidth=2,
label="sensor_1",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_2"],
color="tab:green",
linestyle="solid",
linewidth=2,
label="sensor_2",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_3"],
color="tab:blue",
linestyle="solid",
linewidth=2,
label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()
anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)
plt.legend()
plt.title(
"A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()
####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
rdf["timestamp"],
rdf["severity"],
color="black",
linestyle="solid",
linewidth=2,
label="Severity score",
)
plt.plot(
rdf["timestamp"],
[minSeverity] * len(rdf["severity"]),
color="red",
linestyle="dotted",
linewidth=1,
label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()
####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
rdf["timestamp"],
rdf["series_1"],
width=2,
color="tab:green",
label="sensor_2",
bottom=rdf["series_0"],
)
plt.bar(
rdf["timestamp"],
rdf["series_2"],
width=2,
color="tab:blue",
label="sensor_3",
bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()
A diagramok narancssárga, zöld és kék színnel jelenítik meg az érzékelők nyers adatait (a következtetési ablakban belül). Az első ábrán a piros függőleges vonalak az észlelt anomáliákat mutatják, amelyek súlyossága nagyobb vagy egyenlő.minSeverity
A második ábrán az összes észlelt rendellenesség súlyossági pontszáma látható, a minSeverity
küszöbérték pedig a pontozott piros vonalban látható.
Végül az utolsó ábra az egyes érzékelők adatainak az észlelt anomáliákhoz való hozzájárulását mutatja. Segít diagnosztizálni és megérteni az egyes anomáliák legvalószínűbb okát.