Zpracování konfigurovatelných pravidel založených na prahových hodnotách v Azure Stream Analytics

Tento článek popisuje, jak pomocí referenčních dat dosáhnout řešení upozornění, které používá konfigurovatelná pravidla založená na prahových hodnotách v Azure Stream Analytics.

Scénář: Upozorňování na základě upravitelných prahových hodnot pravidel

Možná budete muset vytvořit upozornění jako výstup, když příchozí streamované události dosáhnou určité hodnoty nebo když agregovaná hodnota založená na příchozích streamovaných událostech překročí určitou prahovou hodnotu. Jednoduše nastavíte dotaz Stream Analytics, který porovná hodnotu se statickou prahovou hodnotou, která je pevná a předem určená. Pevná prahová hodnota může být pevně zakódována do syntaxe dotazu streamování pomocí jednoduchých číselných porovnání (větší než, menší než a rovnost).

V některých případech musí být prahové hodnoty snadněji konfigurovatelné, aniž byste museli upravovat syntaxi dotazu při každé změně prahové hodnoty. V jiných případech můžete potřebovat více zařízení nebo uživatelů zpracovaných stejným dotazem, přičemž každý z nich má pro každý typ zařízení jiné prahové hodnoty.

Tento vzor lze použít k dynamické konfiguraci prahových hodnot, selektivnímu výběru typu zařízení, které se prahová hodnota použije, filtrováním vstupních dat a selektivnímu výběru polí, která se mají zahrnout do výstupu.

Použití vstupu referenčních dat do úlohy Stream Analytics jako vyhledávání prahových hodnot upozornění:

  • Uložte prahové hodnoty v referenčních datech, jednu hodnotu na klíč.
  • Připojte události vstupu streamovaných dat k referenčním datům ve sloupci klíče.
  • Jako prahovou hodnotu použijte hodnotu s klíčem z referenčních dat.

Ukázková data a dotaz

V tomto příkladu se výstrahy generují, když agregace dat streamovaných ze zařízení v minutovém intervalu odpovídá zadaným hodnotám v pravidle zadaném jako referenční data.

V dotazu můžete pro každý deviceId a každou metriku metricName v rámci deviceId nakonfigurovat 0 až 5 dimenzí group BY. Seskupí se pouze události, které mají odpovídající hodnoty filtru. Po seskupení se agregace hodnot Minimum, Maximum a Průměr počítají za přeskakující okno po dobu 60 sekund. Filtry agregovaných hodnot se pak počítají podle nakonfigurované prahové hodnoty v odkazu, aby se vygenerovala výstupní událost upozornění.

Předpokládejme například, že existuje úloha Stream Analytics, která má vstup referenčních dat s názvem rules a streamovaným datovým vstupem s názvem metrics.

Referenční data

Tato ukázková referenční data ukazují, jak by mohlo být reprezentováno pravidlo založené na prahové hodnotě. Soubor JSON obsahuje referenční data, uloží se do úložiště objektů blob v Azure a tento kontejner úložiště objektů blob se používá jako vstup referenčních dat s názvem rules. V průběhu času můžete tento soubor JSON přepsat a nahradit konfiguraci pravidla, aniž byste museli zastavit nebo spustit úlohu streamování.

  • Ukázkové pravidlo se používá k reprezentaci upravitelné výstrahy, když procesor překročí (průměr je větší nebo roven) procentu hodnoty 90 . Pole value je možné podle potřeby konfigurovat.
  • Všimněte si, že pravidlo obsahuje pole operátoru , které se později dynamicky interpretuje v syntaxi AVGGREATEROREQUALdotazu .
  • Pravidlo filtruje data pro určitý klíč 2 dimenze pomocí hodnoty C1. Ostatní pole jsou prázdný řetězec, který označuje, že se vstupní stream nemá filtrovat podle těchto polí událostí. Podle potřeby můžete nastavit další pravidla procesoru pro filtrování dalších odpovídajících polí.
  • Do výstupní události upozornění se nemají zahrnout všechny sloupce. V tomto případě je číslo 2 klíče zapnutéTRUE, includedDim aby představovalo, že pole číslo 2 dat událostí v datovém proudu bude zahrnuto do opravňujících výstupních událostí. Ostatní pole nejsou součástí výstupu upozornění, ale seznam polí je možné upravit.
{
    "ruleId": 1234, 
    "deviceId" : "978648", 
    "metricName": "CPU", 
    "alertName": "hot node AVG CPU over 90",
    "operator" : "AVGGREATEROREQUAL",
    "value": 90, 
    "includeDim": {
        "0": "FALSE", 
        "1": "FALSE", 
        "2": "TRUE", 
        "3": "FALSE", 
        "4": "FALSE"
    },
    "filter": {
        "0": "", 
        "1": "",
        "2": "C1", 
        "3": "", 
        "4": ""
    }    
}

Příklad dotazu streamování

Tento ukázkový dotaz Stream Analytics spojí referenční data pravidel z výše uvedeného příkladu se vstupním datovým proudem s názvem metrics.

WITH transformedInput AS
(
    SELECT
        dim0 = CASE rules.includeDim.[0] WHEN 'TRUE' THEN metrics.custom.dimensions.[0].value ELSE NULL END,
        dim1 = CASE rules.includeDim.[1] WHEN 'TRUE' THEN metrics.custom.dimensions.[1].value ELSE NULL END,
        dim2 = CASE rules.includeDim.[2] WHEN 'TRUE' THEN metrics.custom.dimensions.[2].value ELSE NULL END,
        dim3 = CASE rules.includeDim.[3] WHEN 'TRUE' THEN metrics.custom.dimensions.[3].value ELSE NULL END,
        dim4 = CASE rules.includeDim.[4] WHEN 'TRUE' THEN metrics.custom.dimensions.[4].value ELSE NULL END,
        metric = metrics.metric.value,
        metricName = metrics.metric.name,
        deviceId = rules.deviceId, 
        ruleId = rules.ruleId, 
        alertName = rules.alertName,
        ruleOperator = rules.operator, 
        ruleValue = rules.value
    FROM 
        metrics
        timestamp by eventTime
    JOIN 
        rules
        ON metrics.deviceId = rules.deviceId AND metrics.metric.name = rules.metricName
    WHERE
        (rules.filter.[0] = '' OR metrics.custom.filters.[0].value = rules.filter.[0]) AND 
        (rules.filter.[1] = '' OR metrics.custom.filters.[1].value = rules.filter.[1]) AND
        (rules.filter.[2] = '' OR metrics.custom.filters.[2].value = rules.filter.[2]) AND
        (rules.filter.[3] = '' OR metrics.custom.filters.[3].value = rules.filter.[3]) AND
        (rules.filter.[4] = '' OR metrics.custom.filters.[4].value = rules.filter.[4])
)

SELECT
    System.Timestamp as time, 
    transformedInput.deviceId as deviceId,
    transformedInput.ruleId as ruleId,
    transformedInput.metricName as metric,
    transformedInput.alertName as alert,
    AVG(metric) as avg,
    MIN(metric) as min, 
    MAX(metric) as max, 
    dim0, dim1, dim2, dim3, dim4
FROM
    transformedInput
GROUP BY
    transformedInput.deviceId,
    transformedInput.ruleId,
    transformedInput.metricName,
    transformedInput.alertName,
    dim0, dim1, dim2, dim3, dim4,
    ruleOperator, 
    ruleValue, 
    TumblingWindow(second, 60)
HAVING
    (
        (ruleOperator = 'AVGGREATEROREQUAL' AND avg(metric) >= ruleValue) OR
        (ruleOperator = 'AVGEQUALORLESS' AND avg(metric) <= ruleValue) 
    )

Příklad streamování vstupních dat událostí

Tato ukázková data JSON představují vstupní data metrik , která se používají ve výše uvedeném dotazu na streamování.

  • Tři ukázkové události jsou uvedeny v časovém rozsahu 1 minuta, hodnota T14:50.
  • Všechny tři mají stejnou deviceId hodnotu 978648.
  • Hodnoty metrik procesoru se liší v jednotlivých událostech , 989580 v uvedeném pořadí. Pouze první dvě ukázkové události překračují pravidlo upozornění procesoru vytvořené v pravidle.
  • Pole includeDim v pravidlu upozornění bylo klíčové číslo 2. Odpovídající pole klíče 2 v ukázkových událostech má název NodeName. Tři ukázkové události mají hodnoty N024, N024a N014 . Ve výstupu uvidíte pouze uzel N024 , protože to jsou jediná data, která splňují kritéria upozornění pro vysoké využití procesoru. N014 nesplňuje vysokou prahovou hodnotu procesoru.
  • Pravidlo upozornění je nakonfigurované pouze s klíčem s číslem filter 2, které odpovídá cluster poli v ukázkových událostech. Všechny tři ukázkové události mají hodnotu C1 a odpovídají kritériím filtru.
{
    "eventTime": "2018-04-30T14:50:23.1324132Z",
    "deviceId": "978648",
    "custom": {
        "dimensions": {
            "0": {
                "name": "NodeType",
                "value": "N1"
            },
            "1": {
                "name": "Cluster",
                "value": "C1"
            },
            "2": {
                "name": "NodeName",
                "value": "N024"
            }
        },
        "filters": {
            "0": {
                "name": "application",
                "value": "A1"
            },
            "1": {
                "name": "deviceType",
                "value": "T1"
            },
            "2": {
                "name": "cluster",
                "value": "C1"
            },
            "3": {
                "name": "nodeType",
                "value": "N1"
            }
        }
    },
    "metric": {
        "name": "CPU",
        "value": 98,
        "count": 1.0,
        "min": 98,
        "max": 98,
        "stdDev": 0.0
    }
}
{
    "eventTime": "2018-04-30T14:50:24.1324138Z",
    "deviceId": "978648",
    "custom": {
        "dimensions": {
            "0": {
                "name": "NodeType",
                "value": "N2"
            },
            "1": {
                "name": "Cluster",
                "value": "C1"
            },
            "2": {
                "name": "NodeName",
                "value": "N024"
            }
        },
        "filters": {
            "0": {
                "name": "application",
                "value": "A1"
            },
            "1": {
                "name": "deviceType",
                "value": "T1"
            },
            "2": {
                "name": "cluster",
                "value": "C1"
            },
            "3": {
                "name": "nodeType",
                "value": "N2"
            }
        }
    },
    "metric": {
        "name": "CPU",
        "value": 95,
        "count": 1,
        "min": 95,
        "max": 95,
        "stdDev": 0
    }
}
{
    "eventTime": "2018-04-30T14:50:37.1324130Z",
    "deviceId": "978648",
    "custom": {
        "dimensions": {
            "0": {
                "name": "NodeType",
                "value": "N3"
            },
            "1": {
                "name": "Cluster",
                "value": "C1 "
            },
            "2": {
                "name": "NodeName",
                "value": "N014"
            }
        },
        "filters": {
            "0": {
                "name": "application",
                "value": "A1"
            },
            "1": {
                "name": "deviceType",
                "value": "T1"
            },
            "2": {
                "name": "cluster",
                "value": "C1"
            },
            "3": {
                "name": "nodeType",
                "value": "N3"
            }
        }
    },
    "metric": {
        "name": "CPU",
        "value": 80,
        "count": 1,
        "min": 80,
        "max": 80,
        "stdDev": 0
    }
}

Příklad výstupu

Tento příklad výstupních dat JSON ukazuje vytvoření jedné události upozornění na základě pravidla prahové hodnoty procesoru definovaného v referenčních datech. Výstupní událost obsahuje název upozornění a agregovaná pole (průměr, minimum, maximum). Výstupní data události obsahují kvůli konfiguraci pravidla hodnotu N024 klíče pole s číslem 2NodeName. (Kód JSON byl změněn tak, aby zobrazoval konce řádků pro lepší čitelnost.)

{"time":"2018-05-01T02:03:00.0000000Z","deviceid":"978648","ruleid":1234,"metric":"CPU",
"alert":"hot node AVG CPU over 90","avg":96.5,"min":95.0,"max":98.0,
"dim0":null,"dim1":null,"dim2":"N024","dim3":null,"dim4":null}