Condividi tramite


Istruzioni: Servizi di Azure AI - Rilevamento anomalie multivariato

Questa ricetta illustra come usare i servizi SynapseML e di Intelligenza artificiale di Azure, in Apache Spark, per il rilevamento di anomalie multivariate. Il rilevamento delle anomalie multivariate comporta il rilevamento di anomalie tra molte variabili o serie temporali, tenendo conto di tutte le correlazioni e delle dipendenze tra le diverse variabili. Questo scenario usa SynapseML e i servizi di intelligenza artificiale di Azure per eseguire il training di un modello per il rilevamento anomalie multivariato. Viene quindi usato il modello per dedurre anomalie multivariate all'interno di un set di dati che contiene misurazioni sintetiche da tre sensori IoT.

Importante

A partire dal 20 settembre 2023 non è possibile creare nuove risorse rilevamento anomalie. Il servizio Rilevamento anomalie verrà ritirato il 1° ottobre 2026.

Per ulteriori informazioni sul Rilevatore di anomalie di Azure AI, visitate la risorsa informativa Rilevatore di anomalie.

Prerequisiti

  • Una sottoscrizione di Azure: creare un account gratuitamente
  • Collegare il notebook a un lakehouse. Sul lato sinistro, selezionare Aggiungi per aggiungere un lakehouse esistente o creare un lakehouse.

Attrezzaggio

A partire da una risorsa esistente Anomaly Detector , è possibile esplorare i modi per gestire i dati di vari moduli. Il catalogo dei servizi all'interno di Intelligenza artificiale di Azure offre diverse opzioni:

Creare una risorsa Rilevamento anomalie

  • Nella portale di Azure selezionare Crea nel gruppo di risorse e quindi digitare Rilevamento anomalie. Selezionare la risorsa di Rilevamento anomalie.
  • Denominare la risorsa e idealmente usare la stessa area del resto del gruppo di risorse. Usare le opzioni predefinite per il resto, quindi selezionare Rivedi e crea e quindi Crea.
  • Dopo aver creato la risorsa Anomaly Detector, aprirla e selezionare il Keys and Endpoints pannello nel riquadro di navigazione a sinistra. Copiare la chiave per la risorsa Rilevamento anomalie nella variabile di ambiente ANOMALY_API_KEY oppure archiviarla nella variabile anomalyKey.

Creare una risorsa account di archiviazione

Per salvare i dati intermedi, è necessario creare un account di archiviazione BLOB di Azure. All'interno dell'account di archiviazione creare un contenitore per l'archiviazione dei dati intermedi. Prendere nota del nome del contenitore e copiare il stringa di connessione in tale contenitore. È necessario in un secondo momento popolare la variabile containerName e la variabile di ambiente BLOB_CONNECTION_STRING.

Immettere le chiavi del servizio

Prima di tutto, configurare le variabili di ambiente per le chiavi del servizio. La cella successiva imposta le variabili di ambiente ANOMALY_API_KEY e BLOB_CONNECTION_STRING in base ai valori archiviati nel nostro Azure Key Vault. Se si esegue questa esercitazione nel proprio ambiente, assicurarsi di impostare queste variabili di ambiente prima di procedere:

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Leggere le variabili di ambiente ANOMALY_API_KEY e BLOB_CONNECTION_STRING e impostare le variabili containerName e location.

# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
    "wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"

Connettersi al nostro account di archiviazione, in modo che il rilevatore di anomalie possa salvare i risultati intermedi nel nostro account di archiviazione.

spark.sparkContext._jsc.hadoopConfiguration().set(
    f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)

Importare tutti i moduli necessari:

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt

import synapse.ml
from synapse.ml.cognitive import *

Leggere i dati di esempio in un dataframe Spark:

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)

df = (
    df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)

# Let's inspect the dataframe:
df.show(5)

È ora possibile creare un estimator oggetto che verrà usato per eseguire il training del modello. Vengono specificate le ore di inizio e di fine per i dati di training. Si specificano anche le colonne di input da usare e il nome della colonna che contiene i timestamp. Infine, si specifica il numero di punti dati da usare nella finestra temporale scorrevole di rilevamento anomalie e si imposta la stringa di connessione sull'account di archiviazione BLOB di Azure:

trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]

estimator = (
    FitMultivariateAnomaly()
    .setSubscriptionKey(anomalyKey)
    .setLocation(location)
    .setStartTime(trainingStartTime)
    .setEndTime(trainingEndTime)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
)

Adattiamo estimator ai dati:

model = estimator.fit(df)

Al termine del training, è possibile usare il modello per l'inferenza. Il codice nella cella successiva specifica l'ora di inizio e di fine per i dati in cui si desidera rilevare le anomalie:

inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"

result = (
    model.setStartTime(inferenceStartTime)
    .setEndTime(inferenceEndTime)
    .setOutputCol("results")
    .setErrorCol("errors")
    .setInputCols(inputColumns)
    .setTimestampCol(timestampColumn)
    .transform(df)
)

result.show(5)

Nella cella precedente, .show(5) ci ha mostrato le prime cinque righe del dataframe. I risultati erano tutti null perché sono caduti fuori dalla finestra di inferenza.

Per visualizzare i risultati solo per i dati dedotti, selezionare le colonne necessarie. È quindi possibile ordinare le righe nel dataframe in ordine crescente e filtrare il risultato in modo da visualizzare solo le righe nell'intervallo della finestra di inferenza. In questo caso, inferenceEndTime corrisponde all'ultima riga del dataframe, in modo da poterla ignorare.

Infine, per tracciare meglio i risultati, convertire il frame di dati Spark in un dataframe Pandas:

rdf = (
    result.select(
        "timestamp",
        *inputColumns,
        "results.contributors",
        "results.isAnomaly",
        "results.severity"
    )
    .orderBy("timestamp", ascending=True)
    .filter(col("timestamp") >= lit(inferenceStartTime))
    .toPandas()
)

rdf

Formattare la colonna contributors che archivia il punteggio di contributo da ogni sensore alle anomalie rilevate. La cella successiva gestisce questa operazione e suddivide il punteggio di contributo di ogni sensore nella propria colonna:

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {"series_0": 0, "series_1": 0, "series_2": 0}

rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
    [rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf

Sono ora disponibili i punteggi di contributo dei sensori 1, 2 e 3 rispettivamente nelle colonne series_0, series_1 e series_2.

Per tracciare i risultati, eseguire la cella successiva. Il minSeverity parametro specifica la gravità minima delle anomalie da tracciare:

minSeverity = 0.1

####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
    rdf["timestamp"],
    rdf["sensor_1"],
    color="tab:orange",
    linestyle="solid",
    linewidth=2,
    label="sensor_1",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_2"],
    color="tab:green",
    linestyle="solid",
    linewidth=2,
    label="sensor_2",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_3"],
    color="tab:blue",
    linestyle="solid",
    linewidth=2,
    label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()

anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)

plt.legend()
plt.title(
    "A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()

####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
    rdf["timestamp"],
    rdf["severity"],
    color="black",
    linestyle="solid",
    linewidth=2,
    label="Severity score",
)
plt.plot(
    rdf["timestamp"],
    [minSeverity] * len(rdf["severity"]),
    color="red",
    linestyle="dotted",
    linewidth=1,
    label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()

####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
    rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
    rdf["timestamp"],
    rdf["series_1"],
    width=2,
    color="tab:green",
    label="sensor_2",
    bottom=rdf["series_0"],
)
plt.bar(
    rdf["timestamp"],
    rdf["series_2"],
    width=2,
    color="tab:blue",
    label="sensor_3",
    bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()

Screenshot del tracciato dei risultati del rilevamento anomalie a più varianti.

I tracciati mostrano i dati non elaborati dei sensori (all'interno della finestra di inferenza) in arancione, verde e blu. Le linee verticali rosse nella prima figura mostrano le anomalie rilevate con una gravità maggiore o uguale a minSeverity.

Il secondo grafico mostra il punteggio di gravità di tutte le anomalie rilevate, con la soglia minSeverity visualizzata nella linea rossa punteggiata.

Infine, l'ultimo tracciato mostra il contributo dei dati di ogni sensore alle anomalie rilevate. Ci aiuta a diagnosticare e comprendere la causa più probabile di ogni anomalia.