Recette : Azure AI services – Détecteur d’anomalie multivariée

Cette recette montre comment utiliser SynapseML et Azure AI services sur Apache Spark pour la détection d’anomalie multivariée. La détection d’anomalie multivariée permet de détecter des anomalies parmi de nombreuses variables ou séries chronologiques, en tenant compte de toutes les intercorrélations et dépendances entre les différentes variables. Dans ce scénario, nous utilisons SynapseML pour effectuer l’apprentissage d’un modèle pour la détection d’anomalie multivariée à l’aide d’Azure AI services, puis nous utilisons le modèle pour déduire des anomalies multivariées dans un jeu de données contenant des mesures synthétiques de trois capteurs IoT.

Important

À partir du 20 septembre 2023, vous ne pourrez plus créer de ressources Détecteur d’anomalies. Le service Détecteur d’anomalies sera supprimé le 1er octobre 2026.

Pour en savoir plus sur Azure AI Détecteur d’anomalies, reportez-vous à cette page de documentation.

Prérequis

  • Un abonnement Azure - En créer un gratuitement
  • Attachez votre cahier à une cabane au bord du lac. Sur le côté gauche, sélectionnez Ajouter pour ajouter une maison de lac existante ou créer une maison de lac.

Programme d’installation

Suivez les instructions pour créer une ressource Anomaly Detector à l’aide du portail Azure ou vous pouvez utiliser Azure CLI pour créer cette ressource.

Après avoir configuré un Anomaly Detector, vous pouvez explorer les méthodes de gestion des données de différents formulaires. Le catalogue de services d'Azure AI propose plusieurs options Vision, Parole, Langage, Recherche Web, Décision, Traduction, et Intelligence documentaire.

Créer une ressource Détecteur d’anomalies

  • Dans le portail Azure, sélectionnez Créer dans votre groupe de ressources, puis tapez Détecteur d'anomalies. Sélectionnez la ressource Détecteur d'anomalies.
  • Donnez un nom à la ressource et, dans l’idéal, utilisez la même région que le reste de votre groupe de ressources. Utilisez les options par défaut pour le reste, puis sélectionnez Vérifier + Créer, puis Créer.
  • Une fois la ressource Détecteur d’anomalies créée, ouvrez-la et sélectionnez le panneau Keys and Endpoints dans le volet de navigation de gauche. Copiez la clé de la ressource Détecteur d'anomalies dans la variable d’environnement ANOMALY_API_KEY ou stockez-la dans la variable anomalyKey.

Créer une ressource de compte de stockage

Pour enregistrer les données intermédiaires, vous devez créer un compte Stockage Blob Azure. Dans ce compte de stockage, créez un conteneur pour stocker les données intermédiaires. Notez le nom du conteneur et copiez la chaîne de connexion dans ce conteneur. Vous en aurez besoin ultérieurement pour remplir la variable containerName et la variable d’environnement BLOB_CONNECTION_STRING.

Entrer vos clés de service

Commençons par configurer les variables d’environnement pour nos clés de service. La cellule suivante définit les variables d’environnement ANOMALY_API_KEY et BLOB_CONNECTION_STRING en fonction des valeurs stockées dans notre Azure Key Vault. Si vous exécutez ce tutoriel dans votre propre environnement, veillez à définir ces variables d’environnement avant de continuer.

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

À présent, nous allons lire les variables d’environnement ANOMALY_API_KEY et BLOB_CONNECTION_STRING, et définir les variables containerName et location.

# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
    "wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"

Tout d’abord, nous nous connectons à notre compte de stockage afin que le détecteur d’anomalies puisse y enregistrer les résultats intermédiaires :

spark.sparkContext._jsc.hadoopConfiguration().set(
    f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)

Nous allons importer tous les modules nécessaires.

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt

import synapse.ml
from synapse.ml.cognitive import *

À présent, nous allons lire nos exemples de données dans un DataFrame Spark.

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)

df = (
    df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)

# Let's inspect the dataframe:
df.show(5)

Nous pouvons maintenant créer un objet estimator, qui est utilisé pour effectuer l'apprentissage de notre modèle. Nous spécifions les heures de début et de fin pour les données d’apprentissage. Nous spécifions également les colonnes d’entrée à utiliser et le nom de la colonne qui contient les timestamps. Enfin, nous spécifions le nombre de points de données à utiliser dans la fenêtre glissante de détection d’anomalie, et nous définissons la chaîne de connexion sur le compte Stockage Blob Azure.

trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]

estimator = (
    FitMultivariateAnomaly()
    .setSubscriptionKey(anomalyKey)
    .setLocation(location)
    .setStartTime(trainingStartTime)
    .setEndTime(trainingEndTime)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
)

Maintenant que nous avons créé le estimator, nous allons l’ajuster aux données :

model = estimator.fit(df)
```parameter

Once the training is done, we can now use the model for inference. The code in the next cell specifies the start and end times for the data we would like to detect the anomalies in.

```python
inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"

result = (
    model.setStartTime(inferenceStartTime)
    .setEndTime(inferenceEndTime)
    .setOutputCol("results")
    .setErrorCol("errors")
    .setInputCols(inputColumns)
    .setTimestampCol(timestampColumn)
    .transform(df)
)

result.show(5)

Lorsque nous avons appelé .show(5) dans la cellule précédente, les cinq premières lignes du dataframe nous ont été affichées. Les résultats étaient tous null, car ils n’étaient pas à l’intérieur de la fenêtre d’inférence.

Pour afficher les résultats uniquement pour les données déduites, nous allons sélectionner les colonnes dont nous avons besoin. Nous pouvons ensuite classer les lignes dans le dataframe par ordre croissant et filtrer le résultat pour afficher uniquement les lignes qui se trouvent dans la plage de la fenêtre d’inférence. Dans notre cas, inferenceEndTime est identique à la dernière ligne du dataframe, donc il peut ignorer cela.

Enfin, pour mieux tracer les résultats, nous allons convertir le dataframe Spark en dataframe Pandas.

rdf = (
    result.select(
        "timestamp",
        *inputColumns,
        "results.contributors",
        "results.isAnomaly",
        "results.severity"
    )
    .orderBy("timestamp", ascending=True)
    .filter(col("timestamp") >= lit(inferenceStartTime))
    .toPandas()
)

rdf

Mettez en forme la colonne contributors qui stocke le score de contribution de chaque capteur pour les anomalies détectées. La cellule suivante met en forme ces données et fractionne le score de contribution de chaque capteur dans sa propre colonne.

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {"series_0": 0, "series_1": 0, "series_2": 0}

rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
    [rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf

Très bien ! Nous avons maintenant les scores de contribution des capteurs 1, 2 et 3 dans les colonnes series_0, series_1 et series_2 respectivement.

Exécutez la cellule suivante pour tracer les résultats. Le paramètre minSeverity spécifie la gravité minimale des anomalies à tracer.

minSeverity = 0.1

####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
    rdf["timestamp"],
    rdf["sensor_1"],
    color="tab:orange",
    linestyle="solid",
    linewidth=2,
    label="sensor_1",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_2"],
    color="tab:green",
    linestyle="solid",
    linewidth=2,
    label="sensor_2",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_3"],
    color="tab:blue",
    linestyle="solid",
    linewidth=2,
    label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()

anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)

plt.legend()
plt.title(
    "A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()

####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
    rdf["timestamp"],
    rdf["severity"],
    color="black",
    linestyle="solid",
    linewidth=2,
    label="Severity score",
)
plt.plot(
    rdf["timestamp"],
    [minSeverity] * len(rdf["severity"]),
    color="red",
    linestyle="dotted",
    linewidth=1,
    label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()

####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
    rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
    rdf["timestamp"],
    rdf["series_1"],
    width=2,
    color="tab:green",
    label="sensor_2",
    bottom=rdf["series_0"],
)
plt.bar(
    rdf["timestamp"],
    rdf["series_2"],
    width=2,
    color="tab:blue",
    label="sensor_3",
    bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()

Screenshot of multivariate anomaly detection results plot

Les tracés affichent les données brutes des capteurs (à l’intérieur de la fenêtre d’inférence) en orange, vert et bleu. Les lignes verticales rouges de la première figure montrent les anomalies détectées dont la gravité est supérieure ou égale à minSeverity.

Le deuxième tracé montre le score de gravité de toutes les anomalies détectées, avec le seuil minSeverity indiqué sur la ligne rouge en pointillé.

Enfin, le dernier tracé montre la contribution des données de chaque capteur aux anomalies détectées. Il nous aide à diagnostiquer et à comprendre la cause la plus probable de chaque anomalie.