Рецепт: диагностическое обслуживание с помощью Cognitive Services для больших данных

В этом рецепте показано, как можно использовать Azure Synapse Analytics и Cognitive Services на Apache Spark для прогнозного обслуживания устройств IoT. Мы будем использовать пример для Cosmos DB и Synapse Link. Чтобы упростить работу, в этом рецепте мы будем считывать данные прямо из CSV-файла, а не получать потоковые данные через Cosmos DB и Synapse Link. Мы настоятельно рекомендуем ознакомиться с примером Synapse Link.

Гипотетический сценарий

Гипотетический сценарий — это электростанция, на которой устройства IoT отслеживают состояние паровых турбин. Коллекция IoTSignals содержит данные о скорости вращения (об/мин) и мощности (МВт) для каждой турбины. Сигналы от паровой турбины анализируются для обнаружения аномальных сигналов.

Данные могут содержать выбросы со случайной частотой. В таких случаях значения скорости вращения будут увеличиваться, а выходная мощность — снижаться для защиты цепи. Идея состоит в том, чтобы видеть изменения данных в одно и то же время, но с разными сигналами.

Предварительные требования

Настройка

Создание ресурса "Детектор аномалий"

Ресурсами Azure, на которые вы подписаны, будет представлено семейство служб Azure Cognitive Services. Создайте ресурс для Переводчика с помощью портала Azure или Azure CLI. Кроме того, вы можете сделать следующее:

Запишите конечную точку и ключ для этого ресурса, они понадобятся вам в этом руководстве.

Введите ключи службы

Начнем с добавления ключа и расположения.

service_key = None # Paste your anomaly detector key here
location = None # Paste your anomaly detector location here

assert (service_key is not None)
assert (location is not None)

Чтение данных в DataFrame

Теперь выполним чтение файла IoTSignals в DataFrame. Откройте новую записную книжку в рабочей области Synapse и создайте DataFrame из файла.

df_signals = spark.read.csv("wasbs://publicwasb@mmlspark.blob.core.windows.net/iot/IoTSignals.csv", header=True, inferSchema=True)

Выполнение обнаружения аномалий с помощью Cognitive Services в Spark

Цель состоит в том, чтобы найти экземпляры, в которых сигналы устройств Интернета вещей выводили аномальные значения, чтобы мы могли увидеть проблему и выполнить прогнозное обслуживание. Чтобы сделать это, давайте воспользуемся детектором аномалий в Spark:

from pyspark.sql.functions import col, struct
from mmlspark.cognitive import SimpleDetectAnomalies
from mmlspark.core.spark import FluentAPI

detector = (SimpleDetectAnomalies()
    .setSubscriptionKey(service_key)
    .setLocation(location)
    .setOutputCol("anomalies")
    .setGroupbyCol("grouping")
    .setSensitivity(95)
    .setGranularity("secondly"))

df_anomaly = (df_signals
    .where(col("unitSymbol") == 'RPM')
    .withColumn("timestamp", col("dateTime").cast("string"))
    .withColumn("value", col("measureValue").cast("double"))
    .withColumn("grouping", struct("deviceId"))
    .mlTransform(detector)).cache()

df_anomaly.createOrReplaceTempView('df_anomaly')

Давайте взглянем на данные:

df_anomaly.select("timestamp","value","deviceId","anomalies.isAnomaly").show(3)

В этой ячейке должен появиться результат, который выглядит следующим образом:

TIMESTAMP value deviceId isAnomaly
2020-05-01 18:33:51 3174 dev-7 Неверно
2020-05-01 18:33:52 2976 dev-7 Неверно
2020-05-01 18:33:53 2714 dev-7 Неверно

Визуализация аномалий для одного из устройств

В файле IoTSignals.csv хранятся сигналы от нескольких устройств IoT. Мы сосредоточимся на конкретном устройстве и визуализируем аномальные выходные данные с устройства.

df_anomaly_single_device = spark.sql("""
select
  timestamp,
  measureValue,
  anomalies.expectedValue,
  anomalies.expectedValue + anomalies.upperMargin as expectedUpperValue,
  anomalies.expectedValue - anomalies.lowerMargin as expectedLowerValue,
  case when anomalies.isAnomaly=true then 1 else 0 end as isAnomaly
from
  df_anomaly
where deviceid = 'dev-1' and timestamp < '2020-04-29'
order by timestamp
limit 200""")

Теперь, когда мы создали кадр данных, который представляет собой аномалии для конкретного устройства, мы можем визуализировать эти аномалии:

import matplotlib.pyplot as plt
from pyspark.sql.functions import col

adf = df_anomaly_single_device.toPandas()
adf_subset = df_anomaly_single_device.where(col("isAnomaly") == 1).toPandas()

plt.figure(figsize=(23,8))
plt.plot(adf['timestamp'],adf['expectedUpperValue'], color='darkred', linestyle='solid', linewidth=0.25, label='UpperMargin')
plt.plot(adf['timestamp'],adf['expectedValue'], color='darkgreen', linestyle='solid', linewidth=2, label='Expected Value')
plt.plot(adf['timestamp'],adf['measureValue'], 'b', color='royalblue', linestyle='dotted', linewidth=2, label='Actual')
plt.plot(adf['timestamp'],adf['expectedLowerValue'],  color='black', linestyle='solid', linewidth=0.25, label='Lower Margin')
plt.plot(adf_subset['timestamp'],adf_subset['measureValue'], 'ro', label = 'Anomaly')
plt.legend()
plt.title('RPM Anomalies with Confidence Intervals')
plt.show()

В случае успеха выходные данные будут выглядеть следующим образом:

График Детектора аномалий

Дальнейшие действия

Узнайте, как выполнить прогнозное обслуживание в большом масштабе с помощью Azure Cognitive Services, Azure Synapse Analytics и Azure CosmosDB. Дополнительные сведения см. в полном примере на GitHub.