Azure 流分析在雲和 Azure IoT Edge 中均可用,提供內置的基於機器學習的異常檢測功能,可用於監視兩種最常見的異常:臨時和持久異常。 使用 AnomalyDetection_SpikeAndDip 和 AnomalyDetection_ChangePoint 函數,可以直接在流分析作業中執行異常情況檢測。
機器學習模型採用統一採樣的時間序列。 如果時間序列不統一,您可以在調用異常檢測之前插入帶有滾動視窗的聚合步驟。
機器學習作目前不支援季節性趨勢或多變數相關性。
在 Azure 流分析中使用機器學習進行異常檢測
以下視頻演示了如何使用 Azure 流分析中的機器學習函數實時檢測異常。
模型行為
通常,滑動視窗中的數據越多,模型的準確性就越高。 指定滑動視窗中的數據被視為該時間範圍內其正常值範圍的一部分。 該模型僅考慮滑動視窗上的事件歷史記錄,以檢查當前事件是否異常。 當滑動視窗移動時,舊值將從模型的訓練中逐出。
這些函數的運作方式是根據他們目前所看到的建立某種常態。 通過在置信度內與已建立的正常值進行比較來識別異常值。 視窗大小應基於訓練模型實現正常行為所需的最小事件數,以便在發生異常時能夠識別它。
模型的回應時間隨著歷史記錄大小的增加而增加,因為它需要與過去更多事件進行比較。 我們建議您僅包含必要的事件數以獲得更好的性能。
時間序列中的間隙可能是由於模型在某些時間點未接收事件造成的。 這種情況由流分析使用插補邏輯處理。 同一滑動視窗的歷史記錄大小以及持續時間用於計算事件預期到達的平均速率。
此處提供的異常生成器可用於向IoT中心提供具有不同異常模式的數據。 可以使用這些異常檢測函數設置 Azure 流分析作業,以便從此 IoT 中心讀取數據並檢測異常。
Spike 和 Dip
時間序列事件流中的臨時異常稱為峰值和低谷。 可以使用基於機器學習的運算子 AnomalyDetection_SpikeAndDip 來監控峰值和低谷。
在同一滑動視窗中,如果第二個峰值小於第一個峰值,則與指定置信度內第一個峰值的分數相比,較小峰值的計算分數可能不夠顯著。 您可以嘗試降低模型的置信度來檢測此類異常。 但是,如果您開始收到過多的警報,則可以使用更高的置信區間。
以下示例查詢假定在 2 分鐘滑動視窗中的統一輸入速率為每秒 1 個事件,歷史記錄為 120 個事件。 最終的 SELECT 語句提取並輸出置信度為 95%的分數和異常狀態。
WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME AS time,
CAST(temperature AS float) AS temp,
AnomalyDetection_SpikeAndDip(CAST(temperature AS float), 95, 120, 'spikesanddips')
OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
FROM input
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') AS float) AS
SpikeAndDipScore,
CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS bigint) AS
IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep
更改點
時間序列事件流中的持續異常是事件流中值分佈的變化,例如級別變化和趨勢。 在流分析中,使用基於機器學習的 AnomalyDetection_ChangePoint 運算符來檢測此類異常。
持續的變化比高峰和低谷持續的時間長得多,可能預示著災難性事件。 持續性變化通常肉眼不可見,但可以使用 AnomalyDetection_ChangePoint 運算符檢測到。
下圖是等級更改的範例:
下圖是趨勢變化的範例:
以下示例查詢假定在 20 分鐘滑動視窗中的統一輸入速率為每秒一個事件,歷史記錄大小為 1,200 個事件。 最終的 SELECT 語句提取並輸出置信度為 80%的分數和異常狀態。
WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME AS time,
CAST(temperature AS float) AS temp,
AnomalyDetection_ChangePoint(CAST(temperature AS float), 80, 1200)
OVER(LIMIT DURATION(minute, 20)) AS ChangePointScores
FROM input
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') AS float) AS
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') AS bigint) AS
IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep
效能特性
這些模型的性能取決於歷史記錄大小、視窗持續時間、事件負載以及是否使用函數級分區。 本節討論這些配置,並提供有關如何維持每秒 1 K、5 K 和 10K 事件的攝取速率的示例。
- 歷史記錄大小 - 這些模型隨 歷史記錄大小線性執行。 歷史記錄大小越長,模型對新事件進行評分所需的時間就越長。 這是因為模型將新事件與歷史記錄緩衝區中的每個過去事件進行比較。
- Window duration (視窗持續時間 ) - Window duration (視窗持續時間 ) 應反映接收歷史記錄大小指定的事件數量所需的時間。 如果視窗中沒有那麼多事件,Azure 流分析將插補缺失值。 因此,CPU 消耗是歷史大小的函數。
- 事件負載 - 事件負載越大,模型執行的工作就越多,這會影響 CPU 消耗。 可以通過使作業處於令人尷尬的並行狀態來橫向擴展作業,前提是業務邏輯使用更多輸入分區是有意義的。
-
函數級分區 - 函數級別分區 是通過在
PARTITION BY異常檢測函數調用中使用來完成的。 這種類型的分區會增加開銷,因為需要同時為多個模型維護狀態。 函數級分區用於設備級分區等場景。
關係
歷史記錄大小、視窗持續時間和總事件負載按以下方式相關:
windowDuration(以毫秒為單位)= 1000 * historySize /(每秒輸入事件總數 / 輸入分區計數)
當按 deviceId 對函數進行分區時,在異常檢測函數調用中添加 「PARTITION BY deviceId」。
觀測
下表包括未分區情況下單個節點(六個 SU)的輸送量觀察值:
| 歷史記錄大小(事件) | 視窗持續時間 (ms) | 每秒輸入事件總數 |
|---|---|---|
| 六十 | 55 | 2,200 |
| 600 | 728 | 1,650 |
| 6,000 | 10,910 | 1,100 |
下表包括分區情況下單個節點 (6 個 SU) 的輸送量觀察值:
| 歷史記錄大小(事件) | 視窗持續時間 (ms) | 每秒輸入事件總數 | 裝置計數 |
|---|---|---|---|
| 六十 | 1,091 | 1,100 | 10 |
| 600 | 10,910 | 1,100 | 10 |
| 6,000 | 218,182 | <550 | 10 |
| 六十 | 21,819 | 550 | 100 |
| 600 | 218,182 | 550 | 100 |
| 6,000 | 2,181,819 | <550 | 100 |
運行上述非分區配置的範例代碼位於 Azure 示例的大規模 流式處理存儲庫 中。 該代碼創建一個沒有函數級分區的流分析作業,該作業使用事件中心作為輸入和輸出。 輸入負載是使用測試用戶端生成的。 每個輸入事件都是一個 1 KB 的 json 文件。 事件模擬發送 JSON 資料的 IoT 裝置(最多 1 K 台設備)。 歷史記錄大小、視窗持續時間和總事件負載在兩個輸入分區上有所不同。
備註
如需更精確的評估,請自訂範例以符合您的案例。
識別瓶頸
要確定管道中的瓶頸,請使用 Azure 流分析作業中的 Metrics (指標) 窗格。 檢閱 [輸入/輸出事件] 來了解輸送量,並檢閱「浮水印延遲」 \(英文\) 或 [待處理的事件] 來查看作業是否能與輸入速率保持一致。 針對事件中樞計量,請尋找 [節流的要求],並據以調整閾值單位。 針對 Azure Cosmos DB 計量,檢閱 [輸送量] 底下的 [每個分割區索引鍵範圍每秒取用的 RU 上限],以確保系統會一致地取用您的分割區索引鍵範圍。 針對 Azure SQL DB,請監視 [記錄 IO] 和 [CPU]。