Rezept: Multivariate Anomalieerkennung mit Isolationsgesamtstruktur
Dieses Rezept zeigt, wie Sie SynapseML in Apache Spark für die Erkennung multivariater Anomalien verwenden können. Multivariate Anomalieerkennung ermöglicht die Erkennung von Anomalien zwischen vielen Variablen oder Zeitreihen, wobei alle Interkorrelationen und Abhängigkeiten zwischen den verschiedenen Variablen berücksichtigt werden. In diesem Szenario verwenden wir SynapseML, um ein Isolationsgesamtstrukturmodell für die Erkennung multivariater Anomalien zu trainieren. Anschließend verwenden wir das trainierte Modell, um multivariate Anomalien innerhalb eines Datasets abzuleiten, das synthetische Messungen von drei IoT-Sensoren enthält.
Weitere Informationen zum Isolation Forest-Modell finden Sie in der Originalarbeit von Liu et al..
Voraussetzungen
- Schließen Sie Ihr Notizbuch an ein Lakehouse an. Wählen Sie auf der linken Seite Hinzufügen aus, um ein vorhandenes Lakehouse hinzuzufügen oder ein Lakehouse zu erstellen.
Bibliotheksimporte
from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from synapse.ml.isolationforest import *
from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
from synapse.ml.core.platform import *
if running_on_synapse():
shell = TerminalInteractiveShell.instance()
shell.define_macro("foo", """a,b=10,20""")
Eingabedaten
# Table inputs
timestampColumn = "timestamp" # str: the name of the timestamp column in the table
inputCols = [
"sensor_1",
"sensor_2",
"sensor_3",
] # list(str): the names of the input variables
# Training Start time, and number of days to use for training:
trainingStartTime = (
"2022-02-24T06:00:00Z" # datetime: datetime for when to start the training
)
trainingEndTime = (
"2022-03-08T23:55:00Z" # datetime: datetime for when to end the training
)
inferenceStartTime = (
"2022-03-09T09:30:00Z" # datetime: datetime for when to start the training
)
inferenceEndTime = (
"2022-03-20T23:55:00Z" # datetime: datetime for when to end the training
)
# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0
Lesen von Daten
df = (
spark.read.format("csv")
.option("header", "true")
.load(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
)
)
Umwandeln von Spalten in geeignete Datentypen
df = (
df.orderBy(timestampColumn)
.withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
.drop("_c5")
)
display(df)
Vorbereitung von Trainingsdaten
# filter to data with timestamps within the training window
df_train = df.filter(
(F.col(timestampColumn) >= trainingStartTime)
& (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)
Vorbereitung von Testdaten
# filter to data with timestamps within the inference window
df_test = df.filter(
(F.col(timestampColumn) >= inferenceStartTime)
& (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)
Modell der Isolationsgesamtstruktur trainieren
isolationForest = (
IsolationForest()
.setNumEstimators(num_estimators)
.setBootstrap(False)
.setMaxSamples(max_samples)
.setMaxFeatures(max_features)
.setFeaturesCol("features")
.setPredictionCol("predictedLabel")
.setScoreCol("outlierScore")
.setContamination(contamination)
.setContaminationError(0.01 * contamination)
.setRandomSeed(1)
)
Als Nächstes erstellen wir eine ML-Pipeline, um das Isolationsgesamtstrukturmodell zu trainieren. Außerdem wird veranschaulicht, wie Sie ein MLflow-Experiment erstellen und das trainierte Modell registrieren.
Die MLflow-Modellregistrierung ist nur erforderlich, wenn zu einem späteren Zeitpunkt auf das trainierte Modell zugegriffen wird. Für das Trainieren des Modells und das Ausführen von Rückschlüssen im selben Notebook ist das Modellobjektmodell ausreichend.
va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)
Ausführen von Rückschlüssen
Laden des trainierten Isolationsgesamtstrukturmodells
Ausführen von Rückschlüssen
df_test_pred = model.transform(df_test)
display(df_test_pred)