Wykrywanie anomalii w usłudze Azure Stream Analytics

Usługa Azure Stream Analytics dostępna zarówno w chmurze, jak i w usłudze Azure IoT Edge oferuje wbudowane funkcje wykrywania anomalii oparte na uczeniu maszynowym, które mogą służyć do monitorowania dwóch najczęściej występujących anomalii: tymczasowych i trwałych. Funkcje AnomalyDetection_SpikeAndDip i AnomalyDetection_ChangePoint umożliwiają wykrywanie anomalii bezpośrednio w zadaniu usługi Stream Analytics.

Modele uczenia maszynowego zakładają jednolite próbkowanie szeregów czasowych. Jeśli szereg czasowy nie jest jednolity, możesz wstawić krok agregacji z oknem wirowania przed wywołaniem wykrywania anomalii.

Obecnie operacje uczenia maszynowego nie obsługują trendów sezonowości ani korelacji wielowariancji.

Wykrywanie anomalii przy użyciu uczenia maszynowego w usłudze Azure Stream Analytics

W poniższym filmie wideo pokazano, jak wykryć anomalię w czasie rzeczywistym przy użyciu funkcji uczenia maszynowego w usłudze Azure Stream Analytics.

Zachowanie modelu

Ogólnie rzecz biorąc, dokładność modelu poprawia się przy użyciu większej ilości danych w oknie przewijania. Dane w określonym oknie przewijania są traktowane jako część normalnego zakresu wartości dla tego przedziału czasu. Model uwzględnia tylko historię zdarzeń w oknie przewijania, aby sprawdzić, czy bieżące zdarzenie jest nietypowe. W miarę przesuwania okna stare wartości są eksmitowane z trenowania modelu.

Funkcje działają, ustanawiając pewną normalność na podstawie tego, co widzieli do tej pory. Wartości odstające są identyfikowane przez porównanie z ustalonym poziomem ufności. Rozmiar okna powinien być oparty na minimalnych zdarzeniach wymaganych do wytrenowania modelu pod kątem normalnego zachowania, aby w przypadku wystąpienia anomalii można było go rozpoznać.

Czas odpowiedzi modelu zwiększa się wraz z rozmiarem historii, ponieważ musi być porównywany z większą liczbą przeszłych zdarzeń. Zaleca się uwzględnienie tylko niezbędnej liczby zdarzeń w celu uzyskania lepszej wydajności.

Luki w szeregach czasowych mogą być wynikiem braku odbierania zdarzeń w określonych punktach w czasie przez model. Ta sytuacja jest obsługiwana przez usługę Stream Analytics przy użyciu logiki imputacji. Rozmiar historii, a także czas trwania, dla tego samego okna przewijania jest używany do obliczania średniej szybkości, z jaką zdarzenia mają pochodzić.

Generator anomalii dostępny tutaj może służyć do podawania centrum IoT z danymi o różnych wzorcach anomalii. Zadanie usługi ASA można skonfigurować przy użyciu tych funkcji wykrywania anomalii w celu odczytu z tego centrum IoT Hub i wykrywania anomalii.

Skok i spadek

Tymczasowe anomalie w strumieniu zdarzeń szeregów czasowych są nazywane skokami i spadkami. Skoki i spadki można monitorować za pomocą operatora opartego na uczeniu maszynowym AnomalyDetection_SpikeAndDip.

Przykład anomalii skokowej i spadkowej

W tym samym oknie przesuwnym, jeśli drugi skok jest mniejszy niż pierwszy, obliczony wynik dla mniejszego skoku prawdopodobnie nie jest wystarczająco znaczący w porównaniu do wyniku dla pierwszego skoku w określonym poziomie ufności. Możesz spróbować zmniejszyć poziom ufności modelu, aby wykryć takie anomalie. Jeśli jednak zaczniesz otrzymywać zbyt wiele alertów, możesz użyć większego interwału ufności.

Poniższe przykładowe zapytanie zakłada jednolitą szybkość wprowadzania jednego zdarzenia na sekundę w 2-minutowym oknie przewijania z historią 120 zdarzeń. Końcowa instrukcja SELECT wyodrębnia i generuje wynik oraz stan anomalii z poziomem ufności 95%.

WITH AnomalyDetectionStep AS
(
    SELECT
        EVENTENQUEUEDUTCTIME AS time,
        CAST(temperature AS float) AS temp,
        AnomalyDetection_SpikeAndDip(CAST(temperature AS float), 95, 120, 'spikesanddips')
            OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
    FROM input
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') AS float) AS
    SpikeAndDipScore,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS bigint) AS
    IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep

Punkt zmiany

Trwałe anomalie w strumieniu zdarzeń szeregów czasowych to zmiany rozkładu wartości w strumieniu zdarzeń, takie jak zmiany na poziomie i trendy. W usłudze Stream Analytics takie anomalie są wykrywane przy użyciu operatora AnomalyDetection_ChangePoint opartego na uczeniu maszynowym.

Trwałe zmiany trwają znacznie dłużej niż skoki i spadki i mogą wskazywać na katastrofalne zdarzenia. Trwałe zmiany nie są zwykle widoczne dla nagiego oka, ale można je wykryć za pomocą operatora AnomalyDetection_ChangePoint .

Na poniższej ilustracji przedstawiono przykład zmiany poziomu:

Przykład anomalii dotyczącej zmian na poziomie

Na poniższej ilustracji przedstawiono przykład zmiany trendu:

Przykład anomalii zmiany trendu

Poniższe przykładowe zapytanie zakłada jednolitą szybkość wprowadzania jednego zdarzenia na sekundę w 20-minutowym oknie przewijania o rozmiarze historii wynoszącym 1200 zdarzeń. Końcowa instrukcja SELECT wyodrębnia i generuje wynik oraz stan anomalii z poziomem ufności 80%.

WITH AnomalyDetectionStep AS
(
    SELECT
        EVENTENQUEUEDUTCTIME AS time,
        CAST(temperature AS float) AS temp,
        AnomalyDetection_ChangePoint(CAST(temperature AS float), 80, 1200) 
        OVER(LIMIT DURATION(minute, 20)) AS ChangePointScores
    FROM input
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(ChangePointScores, 'Score') AS float) AS
    ChangePointScore,
    CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') AS bigint) AS
    IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep

Charakterystyka wydajności

Wydajność tych modeli zależy od rozmiaru historii, czasu trwania okna, obciążenia zdarzeń i tego, czy jest używane partycjonowanie na poziomie funkcji. W tej sekcji omówiono te konfiguracje i przedstawiono przykłady dotyczące sposobu utrzymania współczynników pozyskiwania zdarzeń 1K, 5K i 10 000 na sekundę.

  • Rozmiar historii — te modele działają liniowo z rozmiarem historii. Dłuższy rozmiar historii, tym dłużej modele zajmują ocenę nowego zdarzenia. Wynika to z tego, że modele porównują nowe zdarzenie z każdym z poprzednich zdarzeń w buforze historii.
  • Czas trwania oknaczas trwania okna powinien odzwierciedlać czas odbierania jak największej liczby zdarzeń określonych przez rozmiar historii. Bez tego wielu zdarzeń w oknie usługa Azure Stream Analytics będzie imputować brakujące wartości. W związku z tym użycie procesora CPU jest funkcją rozmiaru historii.
  • Obciążenie zdarzeniami — im większe obciążenie zdarzeń, tym większa praca wykonywana przez modele, co ma wpływ na użycie procesora CPU. Zadanie można skalować w poziomie, czyniąc je żenująco równoległym, zakładając, że logika biznesowa ma sens, aby używać większej liczby partycji wejściowych.
  • Partycjonowanie - na poziomie funkcji Partycjonowanie na poziomie funkcji odbywa się za pomocą PARTITION BY wywołania funkcji wykrywania anomalii. Ten typ partycjonowania dodaje obciążenie, ponieważ stan musi być utrzymywany dla wielu modeli w tym samym czasie. Partycjonowanie na poziomie funkcji jest używane w scenariuszach, takich jak partycjonowanie na poziomie urządzenia.

Relacja

Rozmiar historii, czas trwania okna i łączne obciążenie zdarzeń są powiązane w następujący sposób:

windowDuration (w ms) = 1000 * historySize / (łączna liczba zdarzeń wejściowych na sekundę / Liczba partycji wejściowych)

Podczas partycjonowania funkcji według identyfikatora deviceId dodaj element "PARTITION BY deviceId" do wywołania funkcji wykrywania anomalii.

Obserwacje

Poniższa tabela zawiera obserwacje przepływności dla pojedynczego węzła (6 jednostek przesyłania danych) dla przypadku bez partycjonowania:

Rozmiar historii (zdarzenia) Czas trwania okna (ms) Łączna liczba zdarzeń wejściowych na sekundę
60 55 2200
600 728 1,650
6000 10,910 1,100

Poniższa tabela zawiera obserwacje przepływności dla pojedynczego węzła (6 jednostek przesyłania danych) dla przypadku partycjonowanego:

Rozmiar historii (zdarzenia) Czas trwania okna (ms) Łączna liczba zdarzeń wejściowych na sekundę Liczba urządzeń
60 1,091 1,100 10
600 10,910 1,100 10
6000 218,182 <550 10
60 21,819 550 100
600 218,182 550 100
6000 2,181,819 <550 100

Przykładowy kod do uruchamiania konfiguracji bez partycji powyżej znajduje się w repozytorium Streaming At Scale przykładów platformy Azure. Kod tworzy zadanie analizy strumienia bez partycjonowania na poziomie funkcji, które używa usługi Event Hubs jako danych wejściowych i wyjściowych. Obciążenie wejściowe jest generowane przy użyciu klientów testowych. Każde zdarzenie wejściowe jest dokumentem JSON o wartości 1 KB. Zdarzenia symulują urządzenie IoT wysyłające dane JSON (maksymalnie 1K urządzeń). Rozmiar historii, czas trwania okna i łączne obciążenie zdarzeń są zróżnicowane w ponad 2 partycje wejściowe.

Uwaga

Aby uzyskać dokładniejsze oszacowanie, dostosuj przykłady, aby pasowały do danego scenariusza.

Identyfikowanie wąskich gardeł

Użyj okienka Metryki w zadaniu usługi Azure Stream Analytics, aby zidentyfikować wąskie gardła w potoku. Przejrzyj zdarzenia wejściowe/wyjściowe pod kątem przepływności i "Opóźnienie limitu" lub Zdarzenia zaległe , aby sprawdzić, czy zadanie nadąża za szybkością wprowadzania. W przypadku metryk centrum zdarzeń poszukaj żądań ograniczonych i odpowiednio dostosuj jednostki progowe. W przypadku metryk usługi Azure Cosmos DB zapoznaj się z artykułem Maksymalna liczba jednostek RU/s na zakres kluczy partycji w obszarze Przepływność, aby upewnić się, że zakresy kluczy partycji są równomiernie używane. W przypadku bazy danych Azure SQL monitorowanie operacji we/wy dzienników i procesora CPU.

Następne kroki