Deteksi Anomali Multivariat dengan Hutan Isolasi

Artikel ini memperlihatkan bagaimana Anda dapat menggunakan SynapseML di Apache Spark untuk deteksi anomali multivariat. Deteksi anomali multivariat memungkinkan deteksi anomali di antara banyak variabel atau timeseries, dengan mempertimbangkan semua inter-korelasi dan dependensi antara variabel yang berbeda. Dalam skenario ini, kami menggunakan SynapseML untuk melatih model Hutan Isolasi untuk deteksi anomali multivariat, dan kami kemudian menggunakan model terlatih untuk menyimpulkan anomali multivariat dalam himpunan data yang berisi pengukuran sintetis dari tiga sensor IoT.

Untuk mempelajari lebih lanjut tentang model Hutan Isolasi, lihat makalah asli oleh Liu et al..

Prasyarat

  • Lampirkan buku catatan Anda ke lakehouse. Di sisi kiri, pilih Tambahkan untuk menambahkan lakehouse yang ada atau buat lakehouse.

Impor pustaka

from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow

from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline

from synapse.ml.isolationforest import *

from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *

if running_on_synapse():
    shell = TerminalInteractiveShell.instance()
    shell.define_macro("foo", """a,b=10,20""")

Memasukan data

# Table inputs
timestampColumn = "timestamp"  # str: the name of the timestamp column in the table
inputCols = [
    "sensor_1",
    "sensor_2",
    "sensor_3",
]  # list(str): the names of the input variables

# Training Start time, and number of days to use for training:
trainingStartTime = (
    "2022-02-24T06:00:00Z"  # datetime: datetime for when to start the training
)
trainingEndTime = (
    "2022-03-08T23:55:00Z"  # datetime: datetime for when to end the training
)
inferenceStartTime = (
    "2022-03-09T09:30:00Z"  # datetime: datetime for when to start the training
)
inferenceEndTime = (
    "2022-03-20T23:55:00Z"  # datetime: datetime for when to end the training
)

# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0

Membaca data

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
    )
)

melemparkan kolom ke jenis data yang sesuai

df = (
    df.orderBy(timestampColumn)
    .withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
    .withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
    .drop("_c5")
)

display(df)

Persiapan data pelatihan

# filter to data with timestamps within the training window
df_train = df.filter(
    (F.col(timestampColumn) >= trainingStartTime)
    & (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)

Menguji persiapan data

# filter to data with timestamps within the inference window
df_test = df.filter(
    (F.col(timestampColumn) >= inferenceStartTime)
    & (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)

Melatih model Hutan Isolasi

isolationForest = (
    IsolationForest()
    .setNumEstimators(num_estimators)
    .setBootstrap(False)
    .setMaxSamples(max_samples)
    .setMaxFeatures(max_features)
    .setFeaturesCol("features")
    .setPredictionCol("predictedLabel")
    .setScoreCol("outlierScore")
    .setContamination(contamination)
    .setContaminationError(0.01 * contamination)
    .setRandomSeed(1)
)

Selanjutnya, kita membuat alur ML untuk melatih model Hutan Isolasi. Kami juga menunjukkan cara membuat eksperimen MLflow dan mendaftarkan model terlatih.

Pendaftaran model MLflow sangat diperlukan jika mengakses model terlatih di lain waktu. Untuk melatih model, dan melakukan inferensi di notebook yang sama, model objek model sudah cukup.

va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)

Melakukan inferensi

Memuat Model Hutan Isolasi terlatih

Melakukan inferensi

df_test_pred = model.transform(df_test)
display(df_test_pred)

Detektor Anomali Premade

Detektor Anomali Azure AI

  • Status anomali titik terbaru: menghasilkan model menggunakan titik sebelumnya dan menentukan apakah titik terbaru anomali (Scala, Python)
  • Menemukan anomali: menghasilkan model menggunakan seluruh seri dan menemukan anomali dalam seri (Scala, Python)