Recette : Cognitive Services - Détection d’anomalie multivariée
Cette recette montre comment utiliser SynapseML et Azure Cognitive Services sur Apache Spark pour la détection d’anomalies multivariées. La détection d’anomalie multivariée permet de détecter des anomalies parmi de nombreuses variables ou séries chronologiques, en tenant compte de toutes les corrélations et dépendances entre les différentes variables. Dans ce scénario, nous utilisons SynapseML pour entraîner un modèle pour la détection d’anomalies multivariées à l’aide d’Azure Cognitive Services, puis nous utilisons le modèle pour déduire des anomalies multivariées au sein d’un jeu de données contenant des mesures synthétiques à partir de trois capteurs IoT.
Pour en savoir plus sur le Détecteur d'anomalies Cognitive Service, reportez-vous à cette page de documentation.
Prérequis
- Un abonnement Azure - En créer un gratuitement
- Joignez votre bloc-notes à un lakehouse. Sur le côté gauche, sélectionnez Ajouter pour ajouter un lakehouse existant ou créer un lakehouse.
Programme d’installation
Créer une ressource Détecteur d’anomalies
Suivez les instructions pour créer une Anomaly Detector
ressource à l’aide du Portail Azure ou vous pouvez également utiliser Azure CLI pour créer cette ressource.
- Dans le Portail Azure, cliquez sur
Create
votre groupe de ressources, puis tapezAnomaly Detector
. Cliquez sur la ressource Détecteur d'anomalies. - Donnez un nom à la ressource et utilisez idéalement la même région que le reste de votre groupe de ressources. Utilisez les options par défaut pour le reste, puis cliquez sur , puis
Create
surReview + Create
. - Une fois la ressource Détecteur d'anomalies créée, ouvrez-la et cliquez sur le
Keys and Endpoints
panneau à gauche. Copiez la clé de la ressource Détecteur d'anomalies dans laANOMALY_API_KEY
variable d’environnement ou stockez-la dans laanomalyKey
variable.
Créer une ressource de compte de stockage
Pour enregistrer des données intermédiaires, vous devez créer un compte Stockage Blob Azure. Dans ce compte de stockage, créez un conteneur pour stocker les données intermédiaires. Notez le nom du conteneur et copiez la chaîne de connexion dans ce conteneur. Vous en aurez besoin ultérieurement pour remplir la containerName
variable et la variable d’environnement BLOB_CONNECTION_STRING
.
Entrer vos clés de service
Commençons par configurer les variables d’environnement pour nos clés de service. La cellule suivante définit les variables d’environnement ANOMALY_API_KEY
et en BLOB_CONNECTION_STRING
fonction des valeurs stockées dans notre Key Vault Azure. Si vous exécutez ce didacticiel dans votre propre environnement, veillez à définir ces variables d’environnement avant de continuer.
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Maintenant, nous allons lire les variables d’environnement ANOMALY_API_KEY
et et BLOB_CONNECTION_STRING
définir les containerName
variables et location
.
# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
"wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"
Tout d’abord, nous nous connectons à notre compte de stockage afin que le détecteur d’anomalies puisse y enregistrer des résultats intermédiaires :
spark.sparkContext._jsc.hadoopConfiguration().set(
f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)
Importons tous les modules nécessaires.
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import synapse.ml
from synapse.ml.cognitive import *
À présent, nous allons lire nos exemples de données dans un DataFrame Spark.
df = (
spark.read.format("csv")
.option("header", "true")
.load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)
df = (
df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)
# Let's inspect the dataframe:
df.show(5)
Nous pouvons maintenant créer un estimator
objet, qui est utilisé pour entraîner notre modèle. Nous spécifions les heures de début et de fin pour les données d’entraînement. Nous spécifions également les colonnes d’entrée à utiliser et le nom de la colonne qui contient les horodatages. Enfin, nous spécifions le nombre de points de données à utiliser dans la fenêtre glissante de détection des anomalies, et nous définissons la chaîne de connexion sur le compte Stockage Blob Azure.
trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]
estimator = (
FitMultivariateAnomaly()
.setSubscriptionKey(anomalyKey)
.setLocation(location)
.setStartTime(trainingStartTime)
.setEndTime(trainingEndTime)
.setIntermediateSaveDir(intermediateSaveDir)
.setTimestampCol(timestampColumn)
.setInputCols(inputColumns)
.setSlidingWindow(200)
)
Maintenant que nous avons créé le estimator
, nous allons l’ajuster aux données :
model = estimator.fit(df)
Une fois la formation terminée, nous pouvons maintenant utiliser le modèle pour l’inférence. Le code de la cellule suivante spécifie les heures de début et de fin pour les données dans laquelle nous souhaitons détecter les anomalies.
inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"
result = (
model.setStartTime(inferenceStartTime)
.setEndTime(inferenceEndTime)
.setOutputCol("results")
.setErrorCol("errors")
.setInputCols(inputColumns)
.setTimestampCol(timestampColumn)
.transform(df)
)
result.show(5)
Lorsque nous avons appelé .show(5)
dans la cellule précédente, les cinq premières lignes de la trame de données nous ont été affichées. Les résultats étaient tous null
parce qu’ils n’étaient pas à l’intérieur de la fenêtre d’inférence.
Pour afficher les résultats uniquement pour les données déduites, sélectionnez les colonnes dont nous avons besoin. Nous pouvons ensuite classer les lignes de la trame de données par ordre croissant et filtrer le résultat pour afficher uniquement les lignes qui se trouvent dans la plage de la fenêtre d’inférence. Dans notre cas inferenceEndTime
, est identique à la dernière ligne de la trame de données. Vous pouvez donc l’ignorer.
Enfin, pour mieux tracer les résultats, convertissons le dataframe Spark en dataframe Pandas.
rdf = (
result.select(
"timestamp",
*inputColumns,
"results.contributors",
"results.isAnomaly",
"results.severity"
)
.orderBy("timestamp", ascending=True)
.filter(col("timestamp") >= lit(inferenceStartTime))
.toPandas()
)
rdf
Nous allons maintenant mettre en forme la contributors
colonne qui stocke le score de contribution de chaque capteur aux anomalies détectées. La cellule suivante met en forme ces données et fractionne le score de contribution de chaque capteur en sa propre colonne.
def parse(x):
if type(x) is list:
return dict([item[::-1] for item in x])
else:
return {"series_0": 0, "series_1": 0, "series_2": 0}
rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
[rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf
Très bien ! Nous avons maintenant les scores de contribution des capteurs 1, 2 et 3 dans les series_0
colonnes , series_1
et series_2
respectivement.
Nous allons exécuter la cellule suivante pour tracer les résultats. Le minSeverity
paramètre de la première ligne spécifie la gravité minimale des anomalies à tracer.
minSeverity = 0.1
####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
rdf["timestamp"],
rdf["sensor_1"],
color="tab:orange",
linestyle="solid",
linewidth=2,
label="sensor_1",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_2"],
color="tab:green",
linestyle="solid",
linewidth=2,
label="sensor_2",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_3"],
color="tab:blue",
linestyle="solid",
linewidth=2,
label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()
anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)
plt.legend()
plt.title(
"A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()
####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
rdf["timestamp"],
rdf["severity"],
color="black",
linestyle="solid",
linewidth=2,
label="Severity score",
)
plt.plot(
rdf["timestamp"],
[minSeverity] * len(rdf["severity"]),
color="red",
linestyle="dotted",
linewidth=1,
label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()
####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
rdf["timestamp"],
rdf["series_1"],
width=2,
color="tab:green",
label="sensor_2",
bottom=rdf["series_0"],
)
plt.bar(
rdf["timestamp"],
rdf["series_2"],
width=2,
color="tab:blue",
label="sensor_3",
bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()
Les tracés affichent les données brutes des capteurs (à l’intérieur de la fenêtre d’inférence) en orange, vert et bleu. Les lignes verticales rouges de la première figure montrent les anomalies détectées dont la gravité est supérieure ou égale à minSeverity
.
Le deuxième tracé montre le score de gravité de toutes les anomalies détectées, avec le minSeverity
seuil indiqué dans la ligne rouge en pointillés.
Enfin, le dernier tracé montre la contribution des données de chaque capteur aux anomalies détectées. Il nous aide à diagnostiquer et à comprendre la cause la plus probable de chaque anomalie.