Рецепт. Cognitive Services — обнаружение многопараметрической аномалии

В этом рецепте показано, как использовать SynapseML и Azure Cognitive Services в Apache Spark для обнаружения многомерных аномалий. Многопараметрическое обнаружение аномалий позволяет обнаруживать аномалии между множеством переменных или временных рядов с учетом всех взаимосвязей и зависимостей между разными переменными. В этом сценарии мы используем SynapseML для обучения модели для обнаружения многомерных аномалий с помощью Azure Cognitive Services, а затем используем для модели для вывода многомерных аномалий в наборе данных, содержающем искусственные измерения с трех датчиков Интернета вещей.

Дополнительные сведения о Детектор аномалий Cognitive Service см. на этой странице документации.

Предварительные требования

Настройка

Создание ресурса "Детектор аномалий"

Следуйте инструкциям, чтобы создать Anomaly Detector ресурс с помощью портал Azure или вы также можете использовать Azure CLI для создания этого ресурса.

  • В портал Azure щелкните группу Create ресурсов и введите Anomaly Detector. Щелкните ресурс Детектор аномалий.
  • Присвойте ресурсу имя и в идеале используйте тот же регион, что и остальная часть группы ресурсов. Используйте параметры по умолчанию для остальных, а затем щелкните Review + Create и затем Create.
  • После создания ресурса Детектор аномалий откройте его и щелкните Keys and Endpoints панель слева. Скопируйте ключ для ресурса Детектор аномалий в ANOMALY_API_KEY переменную среды или сохраните его в переменной anomalyKey .

Создание ресурса учетной записи хранения

Чтобы сохранить промежуточные данные, необходимо создать учетную запись Хранилище BLOB-объектов Azure. В этой учетной записи хранения создайте контейнер для хранения промежуточных данных. Запишите имя контейнера и скопируйте строку подключения в этот контейнер. Он понадобится позже, чтобы заполнить containerName переменную и переменную BLOB_CONNECTION_STRING среды.

Введите ключи службы

Начнем с настройки переменных среды для ключей служб. Следующая ячейка ANOMALY_API_KEY задает переменные среды и BLOB_CONNECTION_STRING на основе значений, хранящихся в Key Vault Azure. Если вы работаете с этим руководством в собственной среде, убедитесь, что вы установили эти переменные среды, прежде чем продолжить.

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Теперь давайте считаем ANOMALY_API_KEY переменные среды и BLOB_CONNECTION_STRING и задали containerName переменные и location .

# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
    "wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"

Сначала мы подключимся к учетной записи хранения, чтобы детектор аномалий смог сохранить в ней промежуточные результаты:

spark.sparkContext._jsc.hadoopConfiguration().set(
    f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)

Давайте импортируем все необходимые модули.

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt

import synapse.ml
from synapse.ml.cognitive import *

Теперь давайте считаем наши примеры данных в кадр данных Spark.

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)

df = (
    df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)

# Let's inspect the dataframe:
df.show(5)

Теперь можно создать estimator объект , который используется для обучения модели. Мы указываем время начала и окончания для обучающих данных. Мы также указываем используемые входные столбцы и имя столбца, содержащего метки времени. Наконец, мы указываем количество точек данных, используемых в скользящем окне обнаружения аномалий, и задаем строку подключения для учетной записи Хранилище BLOB-объектов Azure.

trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]

estimator = (
    FitMultivariateAnomaly()
    .setSubscriptionKey(anomalyKey)
    .setLocation(location)
    .setStartTime(trainingStartTime)
    .setEndTime(trainingEndTime)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
)

Теперь, когда мы создали estimator, давайте вместим его в данные:

model = estimator.fit(df)

После завершения обучения мы можем использовать модель для вывода. Код в следующей ячейке указывает время начала и окончания для данных, в котором мы хотим обнаружить аномалии.

inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"

result = (
    model.setStartTime(inferenceStartTime)
    .setEndTime(inferenceEndTime)
    .setOutputCol("results")
    .setErrorCol("errors")
    .setInputCols(inputColumns)
    .setTimestampCol(timestampColumn)
    .transform(df)
)

result.show(5)

При вызове .show(5) в предыдущей ячейке были показаны первые пять строк в кадре данных. Результаты были все null потому, что они не находились в окне вывода.

Чтобы отобразить результаты только для выведенных данных, выберите нужные столбцы. Затем можно упорядочить строки в кадре данных по возрастанию и отфильтровать результат, чтобы отобразить только те строки, которые находятся в диапазоне окна вывода. В нашем случае inferenceEndTime это то же самое, что и последняя строка в кадре данных, поэтому это можно игнорировать.

Наконец, чтобы лучше отобразить результаты, давайте преобразуем кадр данных Spark в кадр данных Pandas.

rdf = (
    result.select(
        "timestamp",
        *inputColumns,
        "results.contributors",
        "results.isAnomaly",
        "results.severity"
    )
    .orderBy("timestamp", ascending=True)
    .filter(col("timestamp") >= lit(inferenceStartTime))
    .toPandas()
)

rdf

Теперь отформатируем contributors столбец, в котором хранится оценка вклада каждого датчика в обнаруженные аномалии. Следующая ячейка форматирует эти данные и разделяет оценку вклада каждого датчика на собственный столбец.

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {"series_0": 0, "series_1": 0, "series_2": 0}


rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
    [rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf

Отлично! Теперь у нас есть оценки вкладов датчиков 1, 2 и 3 в series_0столбцах , series_1и series_2 соответственно.

Давайте запустим следующую ячейку, чтобы отобразить результаты. Параметр minSeverity в первой строке указывает минимальную серьезность аномалий для построения.

minSeverity = 0.1


####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
    rdf["timestamp"],
    rdf["sensor_1"],
    color="tab:orange",
    linestyle="solid",
    linewidth=2,
    label="sensor_1",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_2"],
    color="tab:green",
    linestyle="solid",
    linewidth=2,
    label="sensor_2",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_3"],
    color="tab:blue",
    linestyle="solid",
    linewidth=2,
    label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()

anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)

plt.legend()
plt.title(
    "A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()

####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
    rdf["timestamp"],
    rdf["severity"],
    color="black",
    linestyle="solid",
    linewidth=2,
    label="Severity score",
)
plt.plot(
    rdf["timestamp"],
    [minSeverity] * len(rdf["severity"]),
    color="red",
    linestyle="dotted",
    linewidth=1,
    label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()

####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
    rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
    rdf["timestamp"],
    rdf["series_1"],
    width=2,
    color="tab:green",
    label="sensor_2",
    bottom=rdf["series_0"],
)
plt.bar(
    rdf["timestamp"],
    rdf["series_2"],
    width=2,
    color="tab:blue",
    label="sensor_3",
    bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()

Снимок экрана: график результатов многопараметрического обнаружения аномалий

На диаграммах показаны необработанные данные с датчиков (в окне вывода) оранжевым, зеленым и синим цветом. Красные вертикальные линии на первом рисунке показывают обнаруженные аномалии, серьезность которых больше или равна minSeverity.

На втором графике показана оценка серьезности всех обнаруженных аномалий с пороговым значением minSeverity , показанным в пунктирной красной линии.

Наконец, на последнем графике показан вклад данных с каждого датчика в обнаруженные аномалии. Это помогает нам диагностировать и понять наиболее вероятную причину каждой аномалии.

Дальнейшие действия