Recept: Cognitive Services – Detekce anomálií s více proměnnými

Tento recept ukazuje, jak můžete používat SynapseML a Azure Cognitive Services na Apache Sparku k detekci anomálií s více proměnnými. Detekce vícerozměrných anomálií umožňuje detekci anomálií mezi mnoha proměnnými nebo časovými řadami s ohledem na všechny vzájemné korelace a závislosti mezi různými proměnnými. V tomto scénáři použijeme SynapseML k trénování modelu pro detekci anomálií s více proměnnými pomocí služeb Azure Cognitive Services a pak použijeme model k odvozování vícerozměrných anomálií v datové sadě obsahující syntetická měření ze tří senzorů IoT.

Další informace o Detektor anomálií Cognitive Service najdete na této stránce dokumentace.

Požadavky

  • Předplatné Azure – Vytvořte si ho zdarma
  • Připojte poznámkový blok k Lakehouse. Na levé straně vyberte Přidat a přidejte existující lakehouse nebo vytvořte lakehouse.

Nastavení

Vytvoření prostředku Detektor anomálií

Postupujte podle pokynů k vytvoření Anomaly Detector prostředku pomocí Azure Portal nebo můžete k vytvoření tohoto prostředku použít také Azure CLI.

  • V Azure Portal klikněte do Create skupiny prostředků a zadejte Anomaly Detector. Klikněte na prostředek Detektor anomálií.
  • Pojmenujte prostředek a v ideálním případě použijte stejnou oblast jako zbytek skupiny prostředků. Pro zbytek použijte výchozí možnosti a potom klikněte na Review + Create a pak Createna .
  • Po vytvoření prostředku Detektor anomálií ho otevřete a klikněte na Keys and Endpoints panel vlevo. Zkopírujte klíč prostředku Detektor anomálií do ANOMALY_API_KEY proměnné prostředí nebo ho uložte do anomalyKey proměnné .

Vytvoření prostředku účtu úložiště

Abyste mohli ukládat zprostředkující data, musíte vytvořit účet Azure Blob Storage. V rámci účtu úložiště vytvořte kontejner pro ukládání zprostředkujících dat. Poznamenejte si název kontejneru a zkopírujte do tohoto kontejneru připojovací řetězec. Později ho budete potřebovat k naplnění containerName proměnné a BLOB_CONNECTION_STRING proměnné prostředí.

Zadejte své klíče služby.

Začněme nastavením proměnných prostředí pro klíče služby. Další buňka nastaví ANOMALY_API_KEY proměnné prostředí a na BLOB_CONNECTION_STRING základě hodnot uložených v Key Vault Azure. Pokud tento kurz spouštíte ve vlastním prostředí, ujistěte se, že jste nastavili tyto proměnné prostředí, než budete pokračovat.

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Teď přečteme ANOMALY_API_KEY proměnné prostředí a a BLOB_CONNECTION_STRING nastavíme containerName proměnné a location .

# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
    "wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"

Nejprve se připojíme k našemu účtu úložiště, aby detektor anomálií mohl uložit průběžné výsledky:

spark.sparkContext._jsc.hadoopConfiguration().set(
    f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)

Pojďme importovat všechny potřebné moduly.

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt

import synapse.ml
from synapse.ml.cognitive import *

Teď načteme ukázková data do datového rámce Sparku.

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)

df = (
    df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)

# Let's inspect the dataframe:
df.show(5)

Teď můžeme vytvořit estimator objekt, který slouží k trénování modelu. Určíme počáteční a koncový čas trénovacích dat. Určíme také vstupní sloupce, které se mají použít, a název sloupce, který obsahuje časová razítka. Nakonec určíme počet datových bodů, které se mají použít v posuvném okně detekce anomálií, a nastavíme připojovací řetězec na Azure Blob Storage Account.

trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]

estimator = (
    FitMultivariateAnomaly()
    .setSubscriptionKey(anomalyKey)
    .setLocation(location)
    .setStartTime(trainingStartTime)
    .setEndTime(trainingEndTime)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
)

Teď, když jsme vytvořili estimator, ho přizpůsobíme datům:

model = estimator.fit(df)

Po dokončení trénování teď můžeme model použít k odvozování. Kód v další buňce určuje počáteční a koncový čas dat, u kterých chceme zjistit anomálie.

inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"

result = (
    model.setStartTime(inferenceStartTime)
    .setEndTime(inferenceEndTime)
    .setOutputCol("results")
    .setErrorCol("errors")
    .setInputCols(inputColumns)
    .setTimestampCol(timestampColumn)
    .transform(df)
)

result.show(5)

Když jsme volali .show(5) v předchozí buňce, zobrazilo se prvních pět řádků v datovém rámci. Výsledky byly všechny null , protože nebyly uvnitř okna odvozovat.

Pokud chcete zobrazit výsledky jenom pro odvozená data, vyberte sloupce, které potřebujeme. Potom můžeme řádky v datovém rámci seřadit vzestupně a vyfiltrovat výsledek tak, aby zobrazoval jenom řádky, které jsou v oblasti okna odvozování. V našem případě inferenceEndTime je stejný jako poslední řádek v datovém rámci, takže to můžete ignorovat.

Abychom mohli lépe vykreslovat výsledky, převeďte datový rámec Sparku na datový rámec Pandas.

rdf = (
    result.select(
        "timestamp",
        *inputColumns,
        "results.contributors",
        "results.isAnomaly",
        "results.severity"
    )
    .orderBy("timestamp", ascending=True)
    .filter(col("timestamp") >= lit(inferenceStartTime))
    .toPandas()
)

rdf

Teď naformátujme contributors sloupec, který ukládá skóre příspěvku z každého senzoru na zjištěné anomálie. Další buňka tato data naformátuje a rozdělí skóre příspěvku každého senzoru do vlastního sloupce.

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {"series_0": 0, "series_1": 0, "series_2": 0}


rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
    [rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf

Výborně! Teď máme skóre příspěvků senzorů 1, 2 a 3 ve series_0sloupcích , series_1a series_2 v uvedeném pořadí.

Pojďme spustit další buňku, abychom vykreslili výsledky. Parametr minSeverity na prvním řádku určuje minimální závažnost anomálií, které se mají vykreslit.

minSeverity = 0.1


####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
    rdf["timestamp"],
    rdf["sensor_1"],
    color="tab:orange",
    linestyle="solid",
    linewidth=2,
    label="sensor_1",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_2"],
    color="tab:green",
    linestyle="solid",
    linewidth=2,
    label="sensor_2",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_3"],
    color="tab:blue",
    linestyle="solid",
    linewidth=2,
    label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()

anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)

plt.legend()
plt.title(
    "A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()

####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
    rdf["timestamp"],
    rdf["severity"],
    color="black",
    linestyle="solid",
    linewidth=2,
    label="Severity score",
)
plt.plot(
    rdf["timestamp"],
    [minSeverity] * len(rdf["severity"]),
    color="red",
    linestyle="dotted",
    linewidth=1,
    label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()

####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
    rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
    rdf["timestamp"],
    rdf["series_1"],
    width=2,
    color="tab:green",
    label="sensor_2",
    bottom=rdf["series_0"],
)
plt.bar(
    rdf["timestamp"],
    rdf["series_2"],
    width=2,
    color="tab:blue",
    label="sensor_3",
    bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()

Snímek obrazovky s grafem výsledků detekce anomálií s více proměnnými

Grafy zobrazují nezpracovaná data ze senzorů (uvnitř odvozujícího okna) oranžově, zeleně a modře. Červené svislé čáry na prvním obrázku znázorňují zjištěné anomálie, které mají závažnost větší nebo rovnou minSeverity.

Druhý graf ukazuje skóre závažnosti všech zjištěných anomálií s minSeverity prahovou hodnotou zobrazenou tečkovanou červenou čárou.

Nakonec poslední graf znázorňuje příspěvek dat z jednotlivých senzorů ke zjištěným anomáliím. Pomáhá nám diagnostikovat a pochopit nejpravděpodobnější příčinu každé anomálie.

Další kroky