Détection d'anomalie multivariée avec forêt d'isolement

Cet article montre comment vous pouvez utiliser SynapseML sur Apache Spark pour la détection d'anomalie multivariée. La détection d'anomalies multivariées permet de détecter des anomalies parmi de nombreuses variables ou séries temporelles, en tenant compte de toutes les intercorrélations et dépendances entre les différentes variables. Dans ce scénario, nous utilisons SynapseML pour former un modèle de forêt d'isolement pour la détection d'anomalies multivariées, puis nous utilisons le modèle formé pour déduire des anomalies multivariées dans un ensemble de données contenant des mesures synthétiques de trois capteurs IoT.

Pour en savoir plus sur le modèle Isolation Forest, reportez-vous à l'article original de Liu et al..

Prérequis

  • Attachez votre cahier à une cabane au bord du lac. Sur le côté gauche, sélectionnez Ajouter pour ajouter une maison de lac existante ou créer une maison de lac.

Importations de bibliothèque

from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow

from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline

from synapse.ml.isolationforest import *

from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *

if running_on_synapse():
    shell = TerminalInteractiveShell.instance()
    shell.define_macro("foo", """a,b=10,20""")

Données d’entrée

# Table inputs
timestampColumn = "timestamp"  # str: the name of the timestamp column in the table
inputCols = [
    "sensor_1",
    "sensor_2",
    "sensor_3",
]  # list(str): the names of the input variables

# Training Start time, and number of days to use for training:
trainingStartTime = (
    "2022-02-24T06:00:00Z"  # datetime: datetime for when to start the training
)
trainingEndTime = (
    "2022-03-08T23:55:00Z"  # datetime: datetime for when to end the training
)
inferenceStartTime = (
    "2022-03-09T09:30:00Z"  # datetime: datetime for when to start the training
)
inferenceEndTime = (
    "2022-03-20T23:55:00Z"  # datetime: datetime for when to end the training
)

# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0

Lire les données

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
    )
)

convertir les colonnes en types de données appropriés

df = (
    df.orderBy(timestampColumn)
    .withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
    .withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
    .drop("_c5")
)

display(df)

Préparation des données de formation

# filter to data with timestamps within the training window
df_train = df.filter(
    (F.col(timestampColumn) >= trainingStartTime)
    & (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)

Préparation des données de test

# filter to data with timestamps within the inference window
df_test = df.filter(
    (F.col(timestampColumn) >= inferenceStartTime)
    & (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)

Former le modèle de forêt d'isolement

isolationForest = (
    IsolationForest()
    .setNumEstimators(num_estimators)
    .setBootstrap(False)
    .setMaxSamples(max_samples)
    .setMaxFeatures(max_features)
    .setFeaturesCol("features")
    .setPredictionCol("predictedLabel")
    .setScoreCol("outlierScore")
    .setContamination(contamination)
    .setContaminationError(0.01 * contamination)
    .setRandomSeed(1)
)

Ensuite, nous créons un pipeline ML pour former le modèle Isolation Forest. Nous montrons également comment créer une expérience MLflow et enregistrer le modèle formé.

L'enregistrement du modèle MLflow n'est strictement requis que si vous accédez ultérieurement au modèle formé. Pour entraîner le modèle et effectuer des inférences dans le même bloc-notes, le modèle d'objet de modèle est suffisant.

va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)

Effectuer des inférences

Charger le modèle de forêt d'isolement formé

Effectuer des inférences

df_test_pred = model.transform(df_test)
display(df_test_pred)

Détecteur d'anomalies prédéfini

Azure AI Détecteur d'anomalies

  • État d’anomalie du point le plus récent : génère un modèle à l’aide des points précédents et détermine si le point le plus récent est anormal (Scala, Python)
  • Rechercher des anomalies : génère un modèle à l’aide d’une série entière et recherche des anomalies dans la série (Scala, Python)