Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Tento recept ukazuje, jak používat služby SynapseML a Azure AI v Apache Sparku k detekci anomálií s více proměnnými. Detekce vícevariátních anomálií zahrnuje detekci anomálií mezi mnoha proměnnými nebo časovými řadami, přičemž při účtování všech vzájemných korelací a závislostí mezi různými proměnnými. Tento scénář používá SynapseML a služby Azure AI k trénování modelu pro detekci vícevariát anomálií. Model pak použijeme k odvozování vícevariatických anomálií v datové sadě, která obsahuje syntetická měření ze tří senzorů IoT.
Důležité
Od 20. září 2023 nemůžete vytvářet nové prostředky Detektoru anomálií. Služba Detektoru anomálií bude vyřazena 1. října 2026.
Další informace o detektoru anomálií Azure AI najdete v informačním prostředku Detektoru anomálií .
Požadavky
- Předplatné Azure – Vytvoření předplatného zdarma
- Připojte poznámkový blok k jezeru. Na levé straně vyberte Přidat a přidejte existující jezerní dům nebo vytvořte jezero.
Nastavení
Počínaje existujícím Anomaly Detector prostředkem můžete prozkoumat způsoby zpracování dat různých formulářů. Katalog služeb v rámci Azure AI nabízí několik možností:
Vytvoření prostředku Detektor anomálií
- Na webu Azure Portal vyberte Vytvořit ve skupině prostředků a potom zadejte Detektor anomálií. Vyberte prostředek Detektor anomálií.
- Pojmenujte prostředek a v ideálním případě použijte stejnou oblast jako zbytek vaší skupiny prostředků. Použijte výchozí možnosti pro zbytek a pak vyberte Zkontrolovat a vytvořit a pak Vytvořit.
- Po vytvoření prostředku Detektoru anomálií ho otevřete a vyberte
Keys and Endpointspanel v levém navigačním panelu. Zkopírujte klíč pro Detektor anomálií prostředek doANOMALY_API_KEYproměnné prostředí nebo hoanomalyKeyuložte do proměnné.
Vytvoření prostředku účtu úložiště
Pokud chcete uložit zprostředkující data, musíte vytvořit účet služby Azure Blob Storage. V rámci daného účtu úložiště vytvořte kontejner pro ukládání zprostředkujících dat. Poznamenejte si název kontejneru a zkopírujte připojovací řetězec do tohoto kontejneru. Budete ji potřebovat k pozdějšímu naplnění proměnné containerName a proměnné prostředí BLOB_CONNECTION_STRING.
Zadejte klíče služby.
Nejprve nastavte proměnné prostředí pro klíče našich služeb. Další buňka nastaví proměnné prostředí ANOMALY_API_KEY a BLOB_CONNECTION_STRING na základě hodnot uložených v naší službě Azure Key Vault. Pokud tento kurz spustíte ve vlastním prostředí, nezapomeňte před pokračováním nastavit tyto proměnné prostředí:
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Přečtěte si proměnné prostředí ANOMALY_API_KEY a BLOB_CONNECTION_STRING, a nastavte proměnné containerName a location.
# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
"wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"
Připojte se k našemu účtu úložiště, aby detektor anomálií mohl uložit přechodné výsledky v daném účtu úložiště:
spark.sparkContext._jsc.hadoopConfiguration().set(
f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)
Importujte všechny potřebné moduly:
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import synapse.ml
from synapse.ml.cognitive import *
Přečtěte si ukázková data do datového rámce Sparku:
df = (
spark.read.format("csv")
.option("header", "true")
.load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)
df = (
df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)
# Let's inspect the dataframe:
df.show(5)
Teď můžeme vytvořit estimator objekt, který použijeme k trénování modelu. Určujeme počáteční a koncové časy trénovacích dat. Také určíme vstupní sloupce, které se mají použít, a název sloupce, který obsahuje časová razítka. Nakonec určíme počet datových bodů, které se mají použít v posuvném okně detekce anomálií, a připojovací řetězec nastavíme na účet služby Azure Blob Storage:
trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]
estimator = (
FitMultivariateAnomaly()
.setSubscriptionKey(anomalyKey)
.setLocation(location)
.setStartTime(trainingStartTime)
.setEndTime(trainingEndTime)
.setIntermediateSaveDir(intermediateSaveDir)
.setTimestampCol(timestampColumn)
.setInputCols(inputColumns)
.setSlidingWindow(200)
)
Pojďme přizpůsobit estimator datům.
model = estimator.fit(df)
Po dokončení trénování můžeme model použít k odvozování. Kód v další buňce určuje počáteční a koncové časy dat, ve kterých bychom chtěli zjistit anomálie:
inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"
result = (
model.setStartTime(inferenceStartTime)
.setEndTime(inferenceEndTime)
.setOutputCol("results")
.setErrorCol("errors")
.setInputCols(inputColumns)
.setTimestampCol(timestampColumn)
.transform(df)
)
result.show(5)
V předchozí buňce .show(5) jsme ukázali prvních pět řádků datového rámce. Výsledky byly všechny null, protože přistály mimo okno vyhodnocení.
Pokud chcete zobrazit výsledky jenom pro odvozená data, vyberte požadované sloupce. Potom můžeme řádky v datovém rámci seřadit vzestupně a vyfiltrovat výsledek tak, aby zobrazoval pouze řádky v oblasti okna odvozování.
inferenceEndTime Tady odpovídá poslednímu řádku datového rámce, takže ho můžete ignorovat.
Nakonec pro lepší vykreslení výsledků převeďte datový rámec Sparku na datový rámec Pandas:
rdf = (
result.select(
"timestamp",
*inputColumns,
"results.contributors",
"results.isAnomaly",
"results.severity"
)
.orderBy("timestamp", ascending=True)
.filter(col("timestamp") >= lit(inferenceStartTime))
.toPandas()
)
rdf
Naformátujte contributors sloupec, do kterého se uloží skóre příspěvku z každého senzoru, do detekovaných anomálií. Další buňka to zpracuje a rozdělí skóre příspěvku každého senzoru do vlastního sloupce:
def parse(x):
if type(x) is list:
return dict([item[::-1] for item in x])
else:
return {"series_0": 0, "series_1": 0, "series_2": 0}
rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
[rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf
Nyní máme skóre příspěvků snímačů 1, 2 a 3 v series_0, series_1a series_2 sloupce v uvedeném pořadí.
Pokud chcete zobrazit výsledky, spusťte další buňku. Parametr minSeverity určuje minimální závažnost anomálií, které se mají vykreslit:
minSeverity = 0.1
####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
rdf["timestamp"],
rdf["sensor_1"],
color="tab:orange",
linestyle="solid",
linewidth=2,
label="sensor_1",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_2"],
color="tab:green",
linestyle="solid",
linewidth=2,
label="sensor_2",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_3"],
color="tab:blue",
linestyle="solid",
linewidth=2,
label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()
anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)
plt.legend()
plt.title(
"A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()
####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
rdf["timestamp"],
rdf["severity"],
color="black",
linestyle="solid",
linewidth=2,
label="Severity score",
)
plt.plot(
rdf["timestamp"],
[minSeverity] * len(rdf["severity"]),
color="red",
linestyle="dotted",
linewidth=1,
label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()
####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
rdf["timestamp"],
rdf["series_1"],
width=2,
color="tab:green",
label="sensor_2",
bottom=rdf["series_0"],
)
plt.bar(
rdf["timestamp"],
rdf["series_2"],
width=2,
color="tab:blue",
label="sensor_3",
bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()
Grafy zobrazují nezpracovaná data ze senzorů (uvnitř okna odvozování) oranžovou, zelenou a modrou. Červené svislé čáry na prvním obrázku ukazují zjištěné anomálie, které mají závažnost větší nebo rovno minSeverity.
Druhý graf zobrazuje skóre závažnosti všech zjištěných anomálií s minSeverity prahovou hodnotou zobrazenou v tečkované červené čáře.
Nakonec poslední graf ukazuje příspěvek dat z každého senzoru ke zjištěným anomáliím. Pomáhá nám diagnostikovat a pochopit nejpravděpodobnější příčinu každé anomálie.