Rezept: Multivariate Anomalieerkennung mit Isolationsgesamtstruktur

Dieses Rezept zeigt, wie Sie SynapseML in Apache Spark für die Erkennung multivariater Anomalien verwenden können. Multivariate Anomalieerkennung ermöglicht die Erkennung von Anomalien zwischen vielen Variablen oder Zeitreihen, wobei alle Interkorrelationen und Abhängigkeiten zwischen den verschiedenen Variablen berücksichtigt werden. In diesem Szenario verwenden wir SynapseML, um ein Isolationsgesamtstrukturmodell für die Erkennung multivariater Anomalien zu trainieren. Anschließend verwenden wir das trainierte Modell, um multivariate Anomalien innerhalb eines Datasets abzuleiten, das synthetische Messungen von drei IoT-Sensoren enthält.

Weitere Informationen zum Isolation Forest-Modell finden Sie in der Originalarbeit von Liu et al..

Voraussetzungen

  • Schließen Sie Ihr Notizbuch an ein Lakehouse an. Wählen Sie auf der linken Seite Hinzufügen aus, um ein vorhandenes Lakehouse hinzuzufügen oder ein Lakehouse zu erstellen.

Bibliotheksimporte

from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow

from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline

from synapse.ml.isolationforest import *

from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *

if running_on_synapse():
    shell = TerminalInteractiveShell.instance()
    shell.define_macro("foo", """a,b=10,20""")

Eingabedaten

# Table inputs
timestampColumn = "timestamp"  # str: the name of the timestamp column in the table
inputCols = [
    "sensor_1",
    "sensor_2",
    "sensor_3",
]  # list(str): the names of the input variables

# Training Start time, and number of days to use for training:
trainingStartTime = (
    "2022-02-24T06:00:00Z"  # datetime: datetime for when to start the training
)
trainingEndTime = (
    "2022-03-08T23:55:00Z"  # datetime: datetime for when to end the training
)
inferenceStartTime = (
    "2022-03-09T09:30:00Z"  # datetime: datetime for when to start the training
)
inferenceEndTime = (
    "2022-03-20T23:55:00Z"  # datetime: datetime for when to end the training
)

# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0

Lesen von Daten

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
    )
)

Umwandeln von Spalten in geeignete Datentypen

df = (
    df.orderBy(timestampColumn)
    .withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
    .withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
    .drop("_c5")
)

display(df)

Vorbereitung von Trainingsdaten

# filter to data with timestamps within the training window
df_train = df.filter(
    (F.col(timestampColumn) >= trainingStartTime)
    & (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)

Vorbereitung von Testdaten

# filter to data with timestamps within the inference window
df_test = df.filter(
    (F.col(timestampColumn) >= inferenceStartTime)
    & (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)

Modell der Isolationsgesamtstruktur trainieren

isolationForest = (
    IsolationForest()
    .setNumEstimators(num_estimators)
    .setBootstrap(False)
    .setMaxSamples(max_samples)
    .setMaxFeatures(max_features)
    .setFeaturesCol("features")
    .setPredictionCol("predictedLabel")
    .setScoreCol("outlierScore")
    .setContamination(contamination)
    .setContaminationError(0.01 * contamination)
    .setRandomSeed(1)
)

Als Nächstes erstellen wir eine ML-Pipeline, um das Isolationsgesamtstrukturmodell zu trainieren. Außerdem wird veranschaulicht, wie Sie ein MLflow-Experiment erstellen und das trainierte Modell registrieren.

Die MLflow-Modellregistrierung ist nur erforderlich, wenn zu einem späteren Zeitpunkt auf das trainierte Modell zugegriffen wird. Für das Trainieren des Modells und das Ausführen von Rückschlüssen im selben Notebook ist das Modellobjektmodell ausreichend.

va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)

Ausführen von Rückschlüssen

Laden des trainierten Isolationsgesamtstrukturmodells

Ausführen von Rückschlüssen

df_test_pred = model.transform(df_test)
display(df_test_pred)

Nächste Schritte