Multivariatavvikelseidentifiering med isoleringsskog

Den här artikeln visar hur du kan använda SynapseML på Apache Spark för multivariatavvikelseidentifiering. Multivariatavvikelseidentifiering möjliggör identifiering av avvikelser mellan många variabler eller tidsserier, med hänsyn till alla interkorrelationer och beroenden mellan de olika variablerna. I det här scenariot använder vi SynapseML för att träna en isoleringsskogsmodell för multivariatavvikelseidentifiering, och sedan använder vi den tränade modellen för att härleda avvikelser i flera variater i en datauppsättning som innehåller syntetiska mätningar från tre IoT-sensorer.

Mer information om modellen Isoleringsskog finns i originaldokumentet från Liu et al..

Förutsättningar

  • Bifoga anteckningsboken till ett sjöhus. Till vänster väljer du Lägg till för att lägga till ett befintligt sjöhus eller skapa ett sjöhus.

Biblioteksimport

from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow

from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline

from synapse.ml.isolationforest import *

from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *

if running_on_synapse():
    shell = TerminalInteractiveShell.instance()
    shell.define_macro("foo", """a,b=10,20""")

Indata

# Table inputs
timestampColumn = "timestamp"  # str: the name of the timestamp column in the table
inputCols = [
    "sensor_1",
    "sensor_2",
    "sensor_3",
]  # list(str): the names of the input variables

# Training Start time, and number of days to use for training:
trainingStartTime = (
    "2022-02-24T06:00:00Z"  # datetime: datetime for when to start the training
)
trainingEndTime = (
    "2022-03-08T23:55:00Z"  # datetime: datetime for when to end the training
)
inferenceStartTime = (
    "2022-03-09T09:30:00Z"  # datetime: datetime for when to start the training
)
inferenceEndTime = (
    "2022-03-20T23:55:00Z"  # datetime: datetime for when to end the training
)

# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0

Läsa data

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
    )
)

omvandla kolumner till lämpliga datatyper

df = (
    df.orderBy(timestampColumn)
    .withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
    .withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
    .drop("_c5")
)

display(df)

Förberedelse av träningsdata

# filter to data with timestamps within the training window
df_train = df.filter(
    (F.col(timestampColumn) >= trainingStartTime)
    & (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)

Testdataförberedelse

# filter to data with timestamps within the inference window
df_test = df.filter(
    (F.col(timestampColumn) >= inferenceStartTime)
    & (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)

Träna isoleringsskogsmodell

isolationForest = (
    IsolationForest()
    .setNumEstimators(num_estimators)
    .setBootstrap(False)
    .setMaxSamples(max_samples)
    .setMaxFeatures(max_features)
    .setFeaturesCol("features")
    .setPredictionCol("predictedLabel")
    .setScoreCol("outlierScore")
    .setContamination(contamination)
    .setContaminationError(0.01 * contamination)
    .setRandomSeed(1)
)

Sedan skapar vi en ML-pipeline för att träna modellen Isoleringsskog. Vi visar också hur du skapar ett MLflow-experiment och registrerar den tränade modellen.

MLflow-modellregistrering krävs endast om du kommer åt den tränade modellen vid ett senare tillfälle. För att träna modellen och utföra slutsatsdragning i samma notebook-fil räcker modellobjektmodellen.

va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)

Utföra slutsatsdragning

Läs in den tränade isoleringsskogsmodellen

Utföra slutsatsdragning

df_test_pred = model.transform(df_test)
display(df_test_pred)

Färdiga Avvikelseidentifiering

Azure AI-Avvikelseidentifiering

  • Avvikelsestatus för den senaste punkten: genererar en modell med föregående punkter och avgör om den senaste punkten är avvikande (Scala, Python)
  • Hitta avvikelser: genererar en modell med en hel serie och hittar avvikelser i serien (Scala, Python)