Opskrift: Azure AI-tjenester – Registrering af flerdimensionel uregelmæssighed

Denne opskrift viser, hvordan du kan bruge SynapseML- og Azure AI-tjenester på Apache Spark til registrering af flerdimensionel uregelmæssigheder. Registrering af flerdimensionelle uregelmæssigheder gør det muligt at registrere uregelmæssigheder blandt mange variabler eller tidsserier under hensyntagen til alle interkorrelationer og afhængigheder mellem de forskellige variabler. I dette scenarie bruger vi SynapseML til at oplære en model til registrering af flerdimensionel uregelmæssigheder ved hjælp af Azure AI-tjenesterne, og vi bruger derefter modellen til at udlede flervariationer i et datasæt, der indeholder syntetiske målinger fra tre IoT-sensorer.

Vigtigt

Fra og med den 20. september 2023 kan du ikke oprette nye Afvigelsesregistrering ressourcer. Den Afvigelsesregistrering service udgår den 1. oktober 2026.

Du kan få mere at vide om Azure AI-Afvigelsesregistrering på denne dokumentationsside.

Forudsætninger

  • Et Azure-abonnement – Opret et gratis
  • Vedhæft din notesbog til et lakehouse. I venstre side skal du vælge Tilføj for at tilføje et eksisterende lakehouse eller oprette et lakehouse.

Opsætte

Følg vejledningen for at oprette en Anomaly Detector ressource ved hjælp af Azure-portal eller alternativt kan du også bruge kommandolinjegrænsefladen i Azure til at oprette denne ressource.

Når du har konfigureret en Anomaly Detector, kan du udforske metoder til håndtering af data i forskellige formularer. Kataloget over tjenester i Azure AI indeholder flere muligheder: Vision, Speech, Language, Web search, Decision, Translation og Document Intelligence.

Opret en Afvigelsesregistrering ressource

  • I Azure-portal skal du vælge Opret i din ressourcegruppe og derefter skrive Afvigelsesregistrering. Vælg den Afvigelsesregistrering ressource.
  • Giv ressourcen et navn, og brug ideelt set det samme område som resten af ressourcegruppen. Brug standardindstillingerne for resten, og vælg derefter Gennemse + Opret og derefter Opret.
  • Når Afvigelsesregistrering ressourcen er oprettet, skal du åbne den og vælge Keys and Endpoints panelet i venstre navigationsrude. Kopiér nøglen for den Afvigelsesregistrering ressource til miljøvariablenANOMALY_API_KEY, eller gem den i variablenanomalyKey.

Opret en lagerkontoressource

Hvis du vil gemme mellemliggende data, skal du oprette en Azure Blob Storage-konto. Inden for denne lagerkonto skal du oprette en objektbeholder til lagring af de mellemliggende data. Notér navnet på objektbeholderen, og kopiér forbindelsesstreng til den pågældende objektbeholder. Du skal bruge den senere for at udfylde variablen containerName og miljøvariablen BLOB_CONNECTION_STRING .

Angiv dine servicenøgler

Lad os starte med at konfigurere miljøvariabler for vores servicenøgler. Den næste celle angiver ANOMALY_API_KEY miljøvariablerne og BLOB_CONNECTION_STRING baseret på de værdier, der er gemt i vores Azure Key Vault. Hvis du kører dette selvstudium i dit eget miljø, skal du sørge for at angive disse miljøvariabler, før du fortsætter.

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Lad os nu læse ANOMALY_API_KEY miljøvariabler og BLOB_CONNECTION_STRING angive variablerne containerName og location .

# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
    "wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"

Først opretter vi forbindelse til vores lagerkonto, så uregelmæssighedsregistreringen kan gemme mellemliggende resultater der:

spark.sparkContext._jsc.hadoopConfiguration().set(
    f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)

Lad os importere alle de nødvendige moduler.

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt

import synapse.ml
from synapse.ml.cognitive import *

Lad os nu læse vores eksempeldata i en Spark DataFrame.

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)

df = (
    df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)

# Let's inspect the dataframe:
df.show(5)

Vi kan nu oprette et estimator objekt, som bruges til at oplære vores model. Vi angiver start- og sluttidspunkterne for oplæringsdataene. Vi angiver også de inputkolonner, der skal bruges, og navnet på den kolonne, der indeholder tidsstemplet. Endelig angiver vi antallet af datapunkter, der skal bruges i vinduet med glidende registrering af uregelmæssigheder, og vi angiver forbindelsesstreng til Azure Blob Storage-kontoen.

trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]

estimator = (
    FitMultivariateAnomaly()
    .setSubscriptionKey(anomalyKey)
    .setLocation(location)
    .setStartTime(trainingStartTime)
    .setEndTime(trainingEndTime)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
)

Nu, hvor vi har oprettet estimator, kan vi tilpasse den til dataene:

model = estimator.fit(df)
```parameter

Once the training is done, we can now use the model for inference. The code in the next cell specifies the start and end times for the data we would like to detect the anomalies in.

```python
inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"

result = (
    model.setStartTime(inferenceStartTime)
    .setEndTime(inferenceEndTime)
    .setOutputCol("results")
    .setErrorCol("errors")
    .setInputCols(inputColumns)
    .setTimestampCol(timestampColumn)
    .transform(df)
)

result.show(5)

Da vi ringede til .show(5) den forrige celle, viste den os de første fem rækker i datarammen. Resultaterne var alle null , fordi de ikke var inde i inferensvinduet.

Hvis du kun vil have vist resultaterne for de udledte data, kan du vælge de kolonner, vi har brug for. Vi kan derefter sortere rækkerne i datarammen ved hjælp af stigende rækkefølge og filtrere resultatet, så det kun er de rækker, der er i området i inferensvinduet, der vises. I vores tilfælde inferenceEndTime er det samme som den sidste række i datarammen, så det kan ignoreres.

Endelig kan du bedre afbilde resultaterne ved at konvertere Spark-datarammen til en Pandas-dataramme.

rdf = (
    result.select(
        "timestamp",
        *inputColumns,
        "results.contributors",
        "results.isAnomaly",
        "results.severity"
    )
    .orderBy("timestamp", ascending=True)
    .filter(col("timestamp") >= lit(inferenceStartTime))
    .toPandas()
)

rdf

Formatér den contributors kolonne, der gemmer bidragsscoren fra hver sensor, til de registrerede uregelmæssigheder. Den næste celle formaterer disse data og opdeler hver sensors bidragsscore i sin egen kolonne.

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {"series_0": 0, "series_1": 0, "series_2": 0}

rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
    [rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf

Glimrende! Vi har nu bidragsscores for sensorer henholdsvis 1, 2 og 3 i kolonnerne series_0, series_1og series_2 .

Kør den næste celle for at afbilde resultaterne. Parameteren minSeverity angiver minimum alvorsgraden af de uregelmæssigheder, der skal afbildes.

minSeverity = 0.1

####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
    rdf["timestamp"],
    rdf["sensor_1"],
    color="tab:orange",
    linestyle="solid",
    linewidth=2,
    label="sensor_1",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_2"],
    color="tab:green",
    linestyle="solid",
    linewidth=2,
    label="sensor_2",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_3"],
    color="tab:blue",
    linestyle="solid",
    linewidth=2,
    label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()

anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)

plt.legend()
plt.title(
    "A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()

####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
    rdf["timestamp"],
    rdf["severity"],
    color="black",
    linestyle="solid",
    linewidth=2,
    label="Severity score",
)
plt.plot(
    rdf["timestamp"],
    [minSeverity] * len(rdf["severity"]),
    color="red",
    linestyle="dotted",
    linewidth=1,
    label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()

####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
    rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
    rdf["timestamp"],
    rdf["series_1"],
    width=2,
    color="tab:green",
    label="sensor_2",
    bottom=rdf["series_0"],
)
plt.bar(
    rdf["timestamp"],
    rdf["series_2"],
    width=2,
    color="tab:blue",
    label="sensor_3",
    bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()

Screenshot of multivariate anomaly detection results plot

Afbildningerne viser rådata fra sensorerne (inde i inferensvinduet) i orange, grøn og blå. De røde lodrette linjer i det første tal viser de registrerede uregelmæssigheder, der har en alvorsgrad, der er større end eller lig med minSeverity.

I det andet afbildning vises alvorsgradsscoren for alle de registrerede uregelmæssigheder, hvor minSeverity tærsklen vises på den stiplede røde linje.

Endelig viser det sidste plot bidraget fra dataene fra hver sensor til de registrerede uregelmæssigheder. Det hjælper os med at diagnosticere og forstå den mest sandsynlige årsag til hver uregelmæssighed.