Рецепт. Cognitive Services — обнаружение многопараметрической аномалии
В этом рецепте показано, как использовать SynapseML и Azure Cognitive Services в Apache Spark для обнаружения многомерных аномалий. Многопараметрическое обнаружение аномалий позволяет обнаруживать аномалии между множеством переменных или временных рядов с учетом всех взаимосвязей и зависимостей между разными переменными. В этом сценарии мы используем SynapseML для обучения модели для обнаружения многомерных аномалий с помощью Azure Cognitive Services, а затем используем для модели для вывода многомерных аномалий в наборе данных, содержающем искусственные измерения с трех датчиков Интернета вещей.
Дополнительные сведения о Детектор аномалий Cognitive Service см. на этой странице документации.
Предварительные требования
- подписка Azure — создайте бесплатную учетную запись.
- Вложите записную книжку в lakehouse. В левой части щелкните Добавить , чтобы добавить существующий lakehouse или создать lakehouse.
Настройка
Создание ресурса "Детектор аномалий"
Следуйте инструкциям, чтобы создать Anomaly Detector
ресурс с помощью портал Azure или вы также можете использовать Azure CLI для создания этого ресурса.
- В портал Azure щелкните группу
Create
ресурсов и введитеAnomaly Detector
. Щелкните ресурс Детектор аномалий. - Присвойте ресурсу имя и в идеале используйте тот же регион, что и остальная часть группы ресурсов. Используйте параметры по умолчанию для остальных, а затем щелкните
Review + Create
и затемCreate
. - После создания ресурса Детектор аномалий откройте его и щелкните
Keys and Endpoints
панель слева. Скопируйте ключ для ресурса Детектор аномалий вANOMALY_API_KEY
переменную среды или сохраните его в переменнойanomalyKey
.
Создание ресурса учетной записи хранения
Чтобы сохранить промежуточные данные, необходимо создать учетную запись Хранилище BLOB-объектов Azure. В этой учетной записи хранения создайте контейнер для хранения промежуточных данных. Запишите имя контейнера и скопируйте строку подключения в этот контейнер. Он понадобится позже, чтобы заполнить containerName
переменную и переменную BLOB_CONNECTION_STRING
среды.
Введите ключи службы
Начнем с настройки переменных среды для ключей служб. Следующая ячейка ANOMALY_API_KEY
задает переменные среды и BLOB_CONNECTION_STRING
на основе значений, хранящихся в Key Vault Azure. Если вы работаете с этим руководством в собственной среде, убедитесь, что вы установили эти переменные среды, прежде чем продолжить.
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Теперь давайте считаем ANOMALY_API_KEY
переменные среды и BLOB_CONNECTION_STRING
и задали containerName
переменные и location
.
# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
"wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"
Сначала мы подключимся к учетной записи хранения, чтобы детектор аномалий смог сохранить в ней промежуточные результаты:
spark.sparkContext._jsc.hadoopConfiguration().set(
f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)
Давайте импортируем все необходимые модули.
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import synapse.ml
from synapse.ml.cognitive import *
Теперь давайте считаем наши примеры данных в кадр данных Spark.
df = (
spark.read.format("csv")
.option("header", "true")
.load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)
df = (
df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)
# Let's inspect the dataframe:
df.show(5)
Теперь можно создать estimator
объект , который используется для обучения модели. Мы указываем время начала и окончания для обучающих данных. Мы также указываем используемые входные столбцы и имя столбца, содержащего метки времени. Наконец, мы указываем количество точек данных, используемых в скользящем окне обнаружения аномалий, и задаем строку подключения для учетной записи Хранилище BLOB-объектов Azure.
trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]
estimator = (
FitMultivariateAnomaly()
.setSubscriptionKey(anomalyKey)
.setLocation(location)
.setStartTime(trainingStartTime)
.setEndTime(trainingEndTime)
.setIntermediateSaveDir(intermediateSaveDir)
.setTimestampCol(timestampColumn)
.setInputCols(inputColumns)
.setSlidingWindow(200)
)
Теперь, когда мы создали estimator
, давайте вместим его в данные:
model = estimator.fit(df)
После завершения обучения мы можем использовать модель для вывода. Код в следующей ячейке указывает время начала и окончания для данных, в котором мы хотим обнаружить аномалии.
inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"
result = (
model.setStartTime(inferenceStartTime)
.setEndTime(inferenceEndTime)
.setOutputCol("results")
.setErrorCol("errors")
.setInputCols(inputColumns)
.setTimestampCol(timestampColumn)
.transform(df)
)
result.show(5)
При вызове .show(5)
в предыдущей ячейке были показаны первые пять строк в кадре данных. Результаты были все null
потому, что они не находились в окне вывода.
Чтобы отобразить результаты только для выведенных данных, выберите нужные столбцы. Затем можно упорядочить строки в кадре данных по возрастанию и отфильтровать результат, чтобы отобразить только те строки, которые находятся в диапазоне окна вывода. В нашем случае inferenceEndTime
это то же самое, что и последняя строка в кадре данных, поэтому это можно игнорировать.
Наконец, чтобы лучше отобразить результаты, давайте преобразуем кадр данных Spark в кадр данных Pandas.
rdf = (
result.select(
"timestamp",
*inputColumns,
"results.contributors",
"results.isAnomaly",
"results.severity"
)
.orderBy("timestamp", ascending=True)
.filter(col("timestamp") >= lit(inferenceStartTime))
.toPandas()
)
rdf
Теперь отформатируем contributors
столбец, в котором хранится оценка вклада каждого датчика в обнаруженные аномалии. Следующая ячейка форматирует эти данные и разделяет оценку вклада каждого датчика на собственный столбец.
def parse(x):
if type(x) is list:
return dict([item[::-1] for item in x])
else:
return {"series_0": 0, "series_1": 0, "series_2": 0}
rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
[rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf
Отлично! Теперь у нас есть оценки вкладов датчиков 1, 2 и 3 в series_0
столбцах , series_1
и series_2
соответственно.
Давайте запустим следующую ячейку, чтобы отобразить результаты. Параметр minSeverity
в первой строке указывает минимальную серьезность аномалий для построения.
minSeverity = 0.1
####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
rdf["timestamp"],
rdf["sensor_1"],
color="tab:orange",
linestyle="solid",
linewidth=2,
label="sensor_1",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_2"],
color="tab:green",
linestyle="solid",
linewidth=2,
label="sensor_2",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_3"],
color="tab:blue",
linestyle="solid",
linewidth=2,
label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()
anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)
plt.legend()
plt.title(
"A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()
####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
rdf["timestamp"],
rdf["severity"],
color="black",
linestyle="solid",
linewidth=2,
label="Severity score",
)
plt.plot(
rdf["timestamp"],
[minSeverity] * len(rdf["severity"]),
color="red",
linestyle="dotted",
linewidth=1,
label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()
####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
rdf["timestamp"],
rdf["series_1"],
width=2,
color="tab:green",
label="sensor_2",
bottom=rdf["series_0"],
)
plt.bar(
rdf["timestamp"],
rdf["series_2"],
width=2,
color="tab:blue",
label="sensor_3",
bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()
На диаграммах показаны необработанные данные с датчиков (в окне вывода) оранжевым, зеленым и синим цветом. Красные вертикальные линии на первом рисунке показывают обнаруженные аномалии, серьезность которых больше или равна minSeverity
.
На втором графике показана оценка серьезности всех обнаруженных аномалий с пороговым значением minSeverity
, показанным в пунктирной красной линии.
Наконец, на последнем графике показан вклад данных с каждого датчика в обнаруженные аномалии. Это помогает нам диагностировать и понять наиболее вероятную причину каждой аномалии.