Compartir vía


Detección de anomalías multivariante con bosque de aislamiento

En este artículo se muestra cómo puede usar SynapseML en Apache Spark para la detección de anomalías multivariante. La detección de anomalías multivariante permite la detección de anomalías entre muchas variables o series de tiempo, teniendo en cuenta todas las correlaciones y dependencias entre las distintas variables. En este escenario, se usa SynapseML para entrenar un modelo de bosque de aislamiento para la detección de anomalías multivariante y, a continuación, se usa en el modelo entrenado para inferir anomalías multivariantes dentro de un conjunto de datos que contiene medidas sintéticas de tres sensores de IoT.

Para obtener más información sobre el modelo de Bosque de Aislamiento, consulte el documento original de Liu et al..

Requisitos previos

  • Adjunte el bloc de notas a una casa de lago. En el lado izquierdo, seleccione Añadir para añadir una casa de lago existente o crear una casa de lago.

Importaciones de biblioteca

from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow

from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline

from synapse.ml.isolationforest import *

from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *

if running_on_synapse():
    shell = TerminalInteractiveShell.instance()
    shell.define_macro("foo", """a,b=10,20""")

Datos de entrada

# Table inputs
timestampColumn = "timestamp"  # str: the name of the timestamp column in the table
inputCols = [
    "sensor_1",
    "sensor_2",
    "sensor_3",
]  # list(str): the names of the input variables

# Training Start time, and number of days to use for training:
trainingStartTime = (
    "2022-02-24T06:00:00Z"  # datetime: datetime for when to start the training
)
trainingEndTime = (
    "2022-03-08T23:55:00Z"  # datetime: datetime for when to end the training
)
inferenceStartTime = (
    "2022-03-09T09:30:00Z"  # datetime: datetime for when to start the training
)
inferenceEndTime = (
    "2022-03-20T23:55:00Z"  # datetime: datetime for when to end the training
)

# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0

Lectura de datos

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
    )
)

convertir columnas en tipos de datos adecuados

df = (
    df.orderBy(timestampColumn)
    .withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
    .withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
    .drop("_c5")
)

display(df)

Entrenando la preparación de datos

# filter to data with timestamps within the training window
df_train = df.filter(
    (F.col(timestampColumn) >= trainingStartTime)
    & (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)

Preparación de datos de prueba

# filter to data with timestamps within the inference window
df_test = df.filter(
    (F.col(timestampColumn) >= inferenceStartTime)
    & (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)

Entrenamiento del modelo de bosque de aislamiento

isolationForest = (
    IsolationForest()
    .setNumEstimators(num_estimators)
    .setBootstrap(False)
    .setMaxSamples(max_samples)
    .setMaxFeatures(max_features)
    .setFeaturesCol("features")
    .setPredictionCol("predictedLabel")
    .setScoreCol("outlierScore")
    .setContamination(contamination)
    .setContaminationError(0.01 * contamination)
    .setRandomSeed(1)
)

A continuación, se crea una canalización de ML para entrenar el modelo de bosque de aislamiento. También se muestra cómo crear un experimento de MLflow y registrar el modelo entrenado.

El registro de modelos de MLflow solo es estrictamente necesario si accede al modelo entrenado más adelante. Para entrenar el modelo y realizar inferencias en el mismo cuaderno, el modelo de objetos del modelo es suficiente.

va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)

Realizar inferencias

Carga del modelo de bosque de aislamiento entrenado

Realizar inferencias

df_test_pred = model.transform(df_test)
display(df_test_pred)

Anomaly Detector predefinido

Detector de anomalía de Azure AI

  • Estado de anomalía del punto más reciente: genera un modelo mediante puntos anteriores y determina si el punto más reciente es anómalo (Scala, Python)
  • Búsqueda de anomalías: genera un modelo mediante una serie completa y busca anomalías en la serie (Scala, Python)