Receta: Cognitive Services- Detección de anomalías multivariante

Esta receta muestra cómo puede usar SynapseML y Azure Cognitive Services en Apache Spark para la detección de anomalías multivariante. La detección de anomalías multivariante permite la detección de anomalías entre muchas variables o series de tiempo, teniendo en cuenta todas las correlaciones y dependencias entre las distintas variables. En este escenario, usamos SynapseML para entrenar un modelo para la detección de anomalías multivariante mediante Azure Cognitive Services y, a continuación, usamos para el modelo para deducir anomalías multivariantes dentro de un conjunto de datos que contiene medidas sintéticas de tres sensores de IoT.

Para más información sobre la Anomaly Detector Cognitive Services, consulte esta página de documentación.

Requisitos previos

  • Una suscripción a Azure: cree una cuenta gratuita.
  • Adjunte el bloc de notas a una casa de lago. En el lado izquierdo, seleccione Agregar para agregar un lago existente o crear un lago.

Configurar

Creación de un recurso de Anomaly Detector

Siga las instrucciones para crear un Anomaly Detector recurso mediante el Azure Portal o, como alternativa, también puede usar la CLI de Azure para crear este recurso.

  • En el Azure Portal, haga clic Create en el grupo de recursos y escriba Anomaly Detector. Haga clic en el recurso Anomaly Detector.
  • Asigne un nombre al recurso y, idealmente, use la misma región que el resto del grupo de recursos. Use las opciones predeterminadas para el resto y, a continuación, haga clic en Review + Create y, a continuación, Create.
  • Una vez creado el recurso Anomaly Detector, ábralo y haga clic en el Keys and Endpoints panel de la izquierda. Copie la clave del recurso Anomaly Detector en la ANOMALY_API_KEY variable de entorno o almacénela en la anomalyKey variable .

Creación de un recurso de cuenta de almacenamiento

Para guardar los datos intermedios, debe crear una cuenta de Azure Blob Storage. Dentro de esa cuenta de almacenamiento, cree un contenedor para almacenar los datos intermedios. Anote el nombre del contenedor y copie la cadena de conexión en ese contenedor. Lo necesitará más adelante para rellenar la containerName variable y la variable de BLOB_CONNECTION_STRING entorno.

Escriba las claves del servicio.

Comencemos configurando las variables de entorno para nuestras claves de servicio. La celda siguiente establece las variables de ANOMALY_API_KEY entorno y BLOB_CONNECTION_STRING en función de los valores almacenados en nuestra Key Vault de Azure. Si está ejecutando este tutorial en su propio entorno, asegúrese de establecer estas variables de entorno antes de continuar.

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Ahora, permite leer las variables de ANOMALY_API_KEY entorno y BLOB_CONNECTION_STRING y establecer las containerName variables y location .

# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
    "wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"

En primer lugar, nos conectamos a nuestra cuenta de almacenamiento para que el detector de anomalías pueda guardar los resultados intermedios allí:

spark.sparkContext._jsc.hadoopConfiguration().set(
    f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)

Vamos a importar todos los módulos necesarios.

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt

import synapse.ml
from synapse.ml.cognitive import *

Ahora, vamos a leer los datos de ejemplo en un dataframe de Spark.

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)

df = (
    df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)

# Let's inspect the dataframe:
df.show(5)

Ahora podemos crear un estimator objeto , que se usa para entrenar el modelo. Especificamos las horas de inicio y finalización de los datos de entrenamiento. También se especifican las columnas de entrada que se van a usar y el nombre de la columna que contiene las marcas de tiempo. Por último, especificamos el número de puntos de datos que se van a usar en la ventana deslizante de detección de anomalías y establecemos la cadena de conexión en la cuenta de Azure Blob Storage.

trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]

estimator = (
    FitMultivariateAnomaly()
    .setSubscriptionKey(anomalyKey)
    .setLocation(location)
    .setStartTime(trainingStartTime)
    .setEndTime(trainingEndTime)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
)

Ahora que hemos creado , estimatorvamos a ajustarlo a los datos:

model = estimator.fit(df)

Una vez hecho el entrenamiento, ahora podemos usar el modelo para la inferencia. El código de la celda siguiente especifica las horas de inicio y finalización de los datos en los que nos gustaría detectar las anomalías.

inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"

result = (
    model.setStartTime(inferenceStartTime)
    .setEndTime(inferenceEndTime)
    .setOutputCol("results")
    .setErrorCol("errors")
    .setInputCols(inputColumns)
    .setTimestampCol(timestampColumn)
    .transform(df)
)

result.show(5)

Cuando llamamos .show(5) a en la celda anterior, nos mostró las cinco primeras filas de la trama de datos. Los resultados fueron todos null porque no estaban dentro de la ventana de inferencia.

Para mostrar los resultados solo para los datos deducidos, permite seleccionar las columnas que necesitamos. Después, podemos ordenar las filas del dataframe por orden ascendente y filtrar el resultado para mostrar solo las filas que se encuentran en el intervalo de la ventana de inferencia. En nuestro caso inferenceEndTime , es igual que la última fila de la trama de datos, por lo que puede omitirlo.

Por último, para poder trazar mejor los resultados, permite convertir la trama de datos de Spark en una trama de datos de Pandas.

rdf = (
    result.select(
        "timestamp",
        *inputColumns,
        "results.contributors",
        "results.isAnomaly",
        "results.severity"
    )
    .orderBy("timestamp", ascending=True)
    .filter(col("timestamp") >= lit(inferenceStartTime))
    .toPandas()
)

rdf

Ahora vamos a dar formato a la contributors columna que almacena la puntuación de contribución de cada sensor a las anomalías detectadas. La celda siguiente da formato a estos datos y divide la puntuación de contribución de cada sensor en su propia columna.

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {"series_0": 0, "series_1": 0, "series_2": 0}


rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
    [rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf

Magnífico. Ahora tenemos las puntuaciones de contribución de los sensores 1, 2 y 3 en las series_0columnas , series_1y series_2 respectivamente.

Vamos a ejecutar la celda siguiente para trazar los resultados. El minSeverity parámetro de la primera línea especifica la gravedad mínima de las anomalías que se van a trazar.

minSeverity = 0.1


####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
    rdf["timestamp"],
    rdf["sensor_1"],
    color="tab:orange",
    linestyle="solid",
    linewidth=2,
    label="sensor_1",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_2"],
    color="tab:green",
    linestyle="solid",
    linewidth=2,
    label="sensor_2",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_3"],
    color="tab:blue",
    linestyle="solid",
    linewidth=2,
    label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()

anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)

plt.legend()
plt.title(
    "A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()

####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
    rdf["timestamp"],
    rdf["severity"],
    color="black",
    linestyle="solid",
    linewidth=2,
    label="Severity score",
)
plt.plot(
    rdf["timestamp"],
    [minSeverity] * len(rdf["severity"]),
    color="red",
    linestyle="dotted",
    linewidth=1,
    label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()

####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
    rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
    rdf["timestamp"],
    rdf["series_1"],
    width=2,
    color="tab:green",
    label="sensor_2",
    bottom=rdf["series_0"],
)
plt.bar(
    rdf["timestamp"],
    rdf["series_2"],
    width=2,
    color="tab:blue",
    label="sensor_3",
    bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()

Captura de pantalla del gráfico de resultados de detección de anomalías multivariante

Los trazados muestran los datos sin procesar de los sensores (dentro de la ventana de inferencia) en naranja, verde y azul. Las líneas verticales rojas de la primera ilustración muestran las anomalías detectadas que tienen una gravedad mayor o igual que minSeverity.

En el segundo gráfico se muestra la puntuación de gravedad de todas las anomalías detectadas, con el minSeverity umbral mostrado en la línea roja punteada.

Por último, el último gráfico muestra la contribución de los datos de cada sensor a las anomalías detectadas. Nos ayuda a diagnosticar y comprender la causa más probable de cada anomalía.

Pasos siguientes