Ukázky pro dotazy Kusto

Tento článek popisuje běžné dotazy a způsob použití dotazovací jazyk Kusto k jejich splnění.

Zobrazení sloupcového grafu

Pokud chcete promítnout dva nebo více sloupců a potom je použít jako osu X a y grafu:

StormEvents
| where isnotempty(EndLocation)
| summarize event_count=count() by EndLocation
| top 10 by event_count
| render columnchart
  • První sloupec tvoří osu X. Může to být číslo, datum a čas nebo řetězec.
  • Pomocí where, summarizea top omezte objem zobrazených dat.
  • Seřaďte výsledky a definujte pořadí osy x.

Snímek obrazovky se sloupcovým grafem s deseti barevnými sloupci, které znázorňují příslušné hodnoty 10 míst

Získání relací z událostí Start a Stop

V protokolu událostí některé události označují začátek nebo konec rozšířené aktivity nebo relace.

Name City (Město) SessionId Timestamp
Spustit Londýn 2817330 2015-12-09T10:12:02.32
Game (Hra) Londýn 2817330 2015-12-09T10:12:52.45
Spustit Manchester 4267667 2015-12-09T10:14:02.23
Zastavit Londýn 2817330 2015-12-09T10:23:43.18
Zrušit Manchester 4267667 2015-12-09T10:27:26.29
Zastavit Manchester 4267667 2015-12-09T10:28:31.72

Každá událost má ID relace (SessionId). Výzvou je spárovat události spuštění a zastavení s ID relace.

Příklad:

let Events = MyLogTable | where ... ;
Events
| where Name == "Start"
| project Name, City, SessionId, StartTime=timestamp
| join (Events
        | where Name == "Stop"
        | project StopTime=timestamp, SessionId)
    on SessionId
| project City, SessionId, StartTime, StopTime, Duration = StopTime - StartTime

Porovnání událostí spuštění a zastavení s ID relace:

  1. Pomocí příkazu let pojmenujte projekci tabulky, která je před zahájením spojení co nejdála vysunutá.
  2. Pomocí projektu můžete změnit názvy časových razítek tak, aby se ve výsledcích zobrazil čas zahájení i čas zastavení. project také vybere další sloupce, které se zobrazí ve výsledcích.
  3. Pomocí příkazu join můžete porovnat položky start a stop pro stejnou aktivitu. Pro každou aktivitu se vytvoří řádek.
  4. Opětovným použitím project přidejte sloupec, který zobrazí dobu trvání aktivity.

Tady je výstup:

City (Město) SessionId StartTime Čas ukončení Doba trvání
Londýn 2817330 2015-12-09T10:12:02.32 2015-12-09T10:23:43.18 00:11:40.46
Manchester 4267667 2015-12-09T10:14:02.23 2015-12-09T10:28:31.72 00:14:29.49

Získání relací bez použití ID relace

Předpokládejme, že události spuštění a zastavení nemají ID relace, se kterým bychom se mohli shodovat. Máme ale IP adresu klienta, ve kterém se relace proběhla. Za předpokladu, že každá adresa klienta provádí vždy jenom jednu relaci, můžeme každou počáteční událost spárovat s další událostí zastavení ze stejné IP adresy:

Příklad:

Events
| where Name == "Start"
| project City, ClientIp, StartTime = timestamp
| join  kind=inner
    (Events
    | where Name == "Stop"
    | project StopTime = timestamp, ClientIp)
    on ClientIp
| extend duration = StopTime - StartTime
    // Remove matches with earlier stops:
| where  duration > 0
    // Pick out the earliest stop for each start and client:
| summarize arg_min(duration, *) by bin(StartTime,1s), ClientIp

Při každém spuštění se join shoduje se všemi časy zastavení ze stejné IP adresy klienta. Ukázkový kód:

  • Odebere shody s dřívějšími časy zastavení.
  • Seskupením podle času zahájení a IP adresy získáte skupinu pro každou relaci.
  • bin Poskytuje funkci pro StartTime parametr . Pokud tento krok neuděláte, Kusto automaticky použije hodinové intervaly, které v některých časech spuštění odpovídají nesprávným časům zastavení.

arg_min vyhledá řádek s nejmenší dobou trvání v každé skupině a * parametr projde všemi ostatními sloupci.

Argument předpony min_ každého názvu sloupce.

Snímek obrazovky s tabulkou s výsledky se sloupci pro čas spuštění, IP adresu klienta, dobu trvání, město a nejstarší zastavení pro každou kombinaci času spuštění a klienta

Přidejte kód pro počítání dob trvání v intervalech s pohodlnou velikostí. V tomto příkladu se kvůli předvolbě pruhového grafu převedou časové intervaly na čísla dělením 1s :

    // Count the frequency of each duration:
    | summarize count() by duration=bin(min_duration/1s, 10)
      // Cut off the long tail:
    | where duration < 300
      // Display in a bar chart:
    | sort by duration asc | render barchart

Snímek obrazovky se sloupcovým grafem, který znázorňuje počet relací s dobami trvání v zadaných rozsazích

Úplný příklad

Logs
| filter ActivityId == "ActivityId with Blablabla"
| summarize max(Timestamp), min(Timestamp)
| extend Duration = max_Timestamp - min_Timestamp

wabitrace
| filter Timestamp >= datetime(2015-01-12 11:00:00Z)
| filter Timestamp < datetime(2015-01-12 13:00:00Z)
| filter EventText like "NotifyHadoopApplicationJobPerformanceCounters"
| extend Tenant = extract("tenantName=([^,]+),", 1, EventText)
| extend Environment = extract("environmentName=([^,]+),", 1, EventText)
| extend UnitOfWorkId = extract("unitOfWorkId=([^,]+),", 1, EventText)
| extend TotalLaunchedMaps = extract("totalLaunchedMaps=([^,]+),", 1, EventText, typeof(real))
| extend MapsSeconds = extract("mapsMilliseconds=([^,]+),", 1, EventText, typeof(real)) / 1000
| extend TotalMapsSeconds = MapsSeconds  / TotalLaunchedMaps
| filter Tenant == 'DevDiv' and Environment == 'RollupDev2'
| filter TotalLaunchedMaps > 0
| summarize sum(TotalMapsSeconds) by UnitOfWorkId
| extend JobMapsSeconds = sum_TotalMapsSeconds * 1
| project UnitOfWorkId, JobMapsSeconds
| join (
wabitrace
| filter Timestamp >= datetime(2015-01-12 11:00:00Z)
| filter Timestamp < datetime(2015-01-12 13:00:00Z)
| filter EventText like "NotifyHadoopApplicationJobPerformanceCounters"
| extend Tenant = extract("tenantName=([^,]+),", 1, EventText)
| extend Environment = extract("environmentName=([^,]+),", 1, EventText)
| extend UnitOfWorkId = extract("unitOfWorkId=([^,]+),", 1, EventText)
| extend TotalLaunchedReducers = extract("totalLaunchedReducers=([^,]+),", 1, EventText, typeof(real))
| extend ReducesSeconds = extract("reducesMilliseconds=([^,]+)", 1, EventText, typeof(real)) / 1000
| extend TotalReducesSeconds = ReducesSeconds / TotalLaunchedReducers
| filter Tenant == 'DevDiv' and Environment == 'RollupDev2'
| filter TotalLaunchedReducers > 0
| summarize sum(TotalReducesSeconds) by UnitOfWorkId
| extend JobReducesSeconds = sum_TotalReducesSeconds * 1
| project UnitOfWorkId, JobReducesSeconds )
on UnitOfWorkId
| join (
wabitrace
| filter Timestamp >= datetime(2015-01-12 11:00:00Z)
| filter Timestamp < datetime(2015-01-12 13:00:00Z)
| filter EventText like "NotifyHadoopApplicationJobPerformanceCounters"
| extend Tenant = extract("tenantName=([^,]+),", 1, EventText)
| extend Environment = extract("environmentName=([^,]+),", 1, EventText)
| extend JobName = extract("jobName=([^,]+),", 1, EventText)
| extend StepName = extract("stepName=([^,]+),", 1, EventText)
| extend UnitOfWorkId = extract("unitOfWorkId=([^,]+),", 1, EventText)
| extend LaunchTime = extract("launchTime=([^,]+),", 1, EventText, typeof(datetime))
| extend FinishTime = extract("finishTime=([^,]+),", 1, EventText, typeof(datetime))
| extend TotalLaunchedMaps = extract("totalLaunchedMaps=([^,]+),", 1, EventText, typeof(real))
| extend TotalLaunchedReducers = extract("totalLaunchedReducers=([^,]+),", 1, EventText, typeof(real))
| extend MapsSeconds = extract("mapsMilliseconds=([^,]+),", 1, EventText, typeof(real)) / 1000
| extend ReducesSeconds = extract("reducesMilliseconds=([^,]+)", 1, EventText, typeof(real)) / 1000
| extend TotalMapsSeconds = MapsSeconds  / TotalLaunchedMaps
| extend TotalReducesSeconds = (ReducesSeconds / TotalLaunchedReducers / ReducesSeconds) * ReducesSeconds
| extend CalculatedDuration = (TotalMapsSeconds + TotalReducesSeconds) * time(1s)
| filter Tenant == 'DevDiv' and Environment == 'RollupDev2')
on UnitOfWorkId
| extend MapsFactor = TotalMapsSeconds / JobMapsSeconds
| extend ReducesFactor = TotalReducesSeconds / JobReducesSeconds
| extend CurrentLoad = 1536 + (768 * TotalLaunchedMaps) + (1536 * TotalLaunchedMaps)
| extend NormalizedLoad = 1536 + (768 * TotalLaunchedMaps * MapsFactor) + (1536 * TotalLaunchedMaps * ReducesFactor)
| summarize sum(CurrentLoad), sum(NormalizedLoad) by  JobName
| extend SaveFactor = sum_NormalizedLoad / sum_CurrentLoad

Graf souběžných relací v průběhu času

Předpokládejme, že máte tabulku aktivit a jejich počátečních a koncových časů. Můžete zobrazit graf, který zobrazuje, kolik aktivit běží současně v průběhu času.

Tady je ukázkový vstup s názvem X:

SessionId StartTime Čas ukončení
pro 10:01:03 10:10:08
b 10:01:29 10:03:10
c 10:03:02 10:05:20

V případě grafu v minutových intervalech chcete spočítat každou spuštěnou aktivitu v každém minutovém intervalu.

Tady je průběžný výsledek:

X | extend samples = range(bin(StartTime, 1m), StopTime, 1m)

range vygeneruje pole hodnot v zadaných intervalech:

SessionId StartTime Čas zastavení Vzorky
pro 10:01:33 10:06:31 [10:01:00,10:02:00,... 10:06:00]
b 10:02:29 10:03:45 [10:02:00,10:03:00]
c 10:03:12 10:04:30 [10:03:00,10:04:00]

Místo zachování těchto polí je rozbalte pomocí mv-expand:

X | mv-expand samples = range(bin(StartTime, 1m), StopTime , 1m)
SessionId StartTime Čas zastavení Vzorky
pro 10:01:33 10:06:31 10:01:00
pro 10:01:33 10:06:31 10:02:00
pro 10:01:33 10:06:31 10:03:00
pro 10:01:33 10:06:31 10:04:00
pro 10:01:33 10:06:31 10:05:00
pro 10:01:33 10:06:31 10:06:00
b 10:02:29 10:03:45 10:02:00
b 10:02:29 10:03:45 10:03:00
c 10:03:12 10:04:30 10:03:00
c 10:03:12 10:04:30 10:04:00

Teď seskupte výsledky podle času výběru a spočítejte výskyty jednotlivých aktivit:

X
| mv-expand samples = range(bin(StartTime, 1m), StopTime , 1m)
| summarize count_SessionId = count() by bin(todatetime(samples),1m)
  • Použijte, todatetime() protože výsledkem mv-expand je sloupec dynamického typu.
  • Pokud bin() nezadáte interval summarize , použijte u číselných hodnot a kalendářních dat vždy bin() funkci pomocí výchozího intervalu.

Tady je výstup:

count_SessionId Vzorky
1 10:01:00
2 10:02:00
3 10:03:00
2 10:04:00
1 10:05:00
1 10:06:00

K vykreslení výsledků můžete použít pruhový graf nebo časový diagram.

Zavedení přihrádek null do souhrnu

summarize Když se operátor použije u klíče skupiny, který se skládá ze sloupce data a času, přihrádky těchto hodnot do intervalů s pevnou šířkou:

let StartTime=ago(12h);
let StopTime=now()
T
| where Timestamp > StartTime and Timestamp <= StopTime
| where ...
| summarize Count=count() by bin(Timestamp, 5m)

Tento příklad vytvoří tabulku, která má jeden řádek na skupinu řádků v T každé přihrádce po pěti minutách.

Co kód neudělá, je přidat "null bins" – řádky pro hodnoty časového intervalu mezi StartTime a StopTime , pro které není žádný odpovídající řádek v T. Je vhodné tabulku "vložit" do těchto přihrádek. Tady je jeden způsob, jak to udělat:

let StartTime=ago(12h);
let StopTime=now()
T
| where Timestamp > StartTime and Timestamp <= StopTime
| summarize Count=count() by bin(Timestamp, 5m)
| where ...
| union ( // 1
  range x from 1 to 1 step 1 // 2
  | mv-expand Timestamp=range(StartTime, StopTime, 5m) to typeof(datetime) // 3
  | extend Count=0 // 4
  )
| summarize Count=sum(Count) by bin(Timestamp, 5m) // 5

Tady je podrobné vysvětlení předchozího dotazu:

  1. Pomocí operátoru union přidejte do tabulky další řádky. Tyto řádky jsou vytvářeny výrazem union .
  2. Operátor range vytvoří tabulku, která má jeden řádek a sloupec. Tabulka se nepoužívá k ničemu jinému než k mv-expand práci.
  3. Operátor mv-expand funkce range vytvoří tolik řádků, kolik je pětiminutových intervalů mezi StartTime a EndTime.
  4. Použijte a Count of 0.
  5. Operátor seskupí summarize intervaly z původního (levého nebo vnějšího) argumentu do union. Operátor také do intervalu od vnitřního argumentu (řádky přihrádky s hodnotou null). Tento proces zajistí, že výstup má jeden řádek na přihrádku, jehož hodnota je buď nulová, nebo původní počet.

Získejte více ze svých dat pomocí Kusto se strojovým učením

Mnoho zajímavých případů použití využívá algoritmy strojového učení a odvozuje zajímavé přehledy z telemetrických dat. Tyto algoritmy často jako vstup vyžadují přísně strukturovanou datovou sadu. Nezpracovaná data protokolu obvykle neodpovídají požadované struktuře a velikosti.

Začněte tím, že vyhledáte anomálie v míře chyb konkrétní služby odvozování Bingu. Tabulka protokolů obsahuje 65 miliard záznamů. Následující základní dotaz vyfiltruje 250 000 chyb a pak vytvoří časovou řadu počtu chyb, která používá funkci detekce anomálií series_decompose_anomalies. Anomálie detekuje služba Kusto a v grafu časových řad jsou zvýrazněné jako červené tečky.

Logs
| where Timestamp >= datetime(2015-08-22) and Timestamp < datetime(2015-08-23)
| where Level == "e" and Service == "Inferences.UnusualEvents_Main"
| summarize count() by bin(Timestamp, 5min)
| render anomalychart

Služba identifikovala několik časových intervalů s podezřelou mírou chyb. K přiblížení tohoto časového rámce použijte Kusto. Potom spusťte dotaz, který sloupec agreguje Message . Zkuste najít nejčastější chyby.

Relevantní části celého trasování zásobníku zprávy se oříznou, aby se výsledky líp vešly na stránku.

Můžete vidět úspěšnou identifikaci osmi hlavních chyb. Další je ale dlouhá řada chyb, protože chybová zpráva byla vytvořena pomocí formátového řetězce, který obsahoval měnící se data:

Logs
| where Timestamp >= datetime(2015-08-22 05:00) and Timestamp < datetime(2015-08-22 06:00)
| where Level == "e" and Service == "Inferences.UnusualEvents_Main"
| summarize count() by Message
| top 10 by count_
| project count_, Message
Počet_ Zpráva
7125 ExecuteAlgorithmMethod pro metodu RunCycleFromInterimData selhalo...
7125 Volání inferenceHostService se nezdařilo. System.NullReferenceException: Odkaz na objekt není nastaven na instanci objektu...
7124 Neočekávaná chyba systému odvození. System.NullReferenceException: Odkaz na objekt není nastaven na instanci objektu...
5112 Neočekávaná chyba systému odvození. System.NullReferenceException: Odkaz na objekt není nastaven na instanci objektu.
174 Volání inferenceHostService se nezdařilo. System.ServiceModel.CommunicationException: Při zápisu do kanálu došlo k chybě:...
10 ExecuteAlgorithmMethod pro metodu RunCycleFromInterimData selhalo...
10 Chyba systému odvození. Microsoft.Bing.Platform.Inferences.Service.Managers.UserInterimDataManagerException:...
3 Volání inferenceHostService se nezdařilo. System.ServiceModel.CommunicationObjectFaultedException:...
1 Odvozovat systémová chyba... SocialGraph.BOSS.OperationResponse... AIS TraceId:8292FC561AC64BED8FA243808FE74EFD...
1 Odvozovat systémová chyba... SocialGraph.BOSS.OperationResponse... AIS TraceId: 5F79F7587FF943EC9B641E02E701AFBF...

V tomto okamžiku vám pomůže použití operátoru reduce . Operátor identifikoval v kódu 63 různých chyb, které vznikly ve stejném bodě instrumentace trasování. reduce pomáhá zaměřit se na další smysluplná trasování chyb v daném časovém intervalu.

Logs
| where Timestamp >= datetime(2015-08-22 05:00) and Timestamp < datetime(2015-08-22 06:00)
| where Level == "e" and Service == "Inferences.UnusualEvents_Main"
| reduce by Message with threshold=0.35
| project Count, Pattern
Počet Vzor
7125 ExecuteAlgorithmMethod pro metodu RunCycleFromInterimData selhalo...
7125 Volání inferenceHostService se nezdařilo. System.NullReferenceException: Odkaz na objekt není nastaven na instanci objektu...
7124 Neočekávaná chyba systému odvození. System.NullReferenceException: Odkaz na objekt není nastaven na instanci objektu...
5112 Neočekávaná chyba systému odvození. System.NullReferenceException: Odkaz na objekt není nastaven na instanci objektu...
174 Volání inferenceHostService se nezdařilo. System.ServiceModel.CommunicationException: Při zápisu do kanálu došlo k chybě:...
63 Chyba systému odvození. Microsoft.Bing.Platform.Inferences.*: Zápis * pro zápis do objektu BOSS.*: SocialGraph.BOSS.Reques...
10 ExecuteAlgorithmMethod pro metodu RunCycleFromInterimData selhalo...
10 Chyba systému odvození. Microsoft.Bing.Platform.Inferences.Service.Managers.UserInterimDataManagerException:...
3 Volání inferenceHostService se nezdařilo. System.ServiceModel.*: Objekt System.ServiceModel.Channels.*+*, pro ** je objekt *... ve společnosti Syst...

Teď máte dobrý přehled o hlavních chybách, které přispěly ke zjištěným anomáliím.

Pokud chcete porozumět vlivu těchto chyb na ukázkový systém, zvažte následující:

  • Tabulka Logs obsahuje další rozměrová data, například Component a Cluster.
  • Nový modul plug-in autocluster vám může pomoct získat přehled o komponentách a clusterech pomocí jednoduchého dotazu.

V následujícím příkladu jasně vidíte, že každá z prvních čtyř chyb je specifická pro komponentu. Přestože jsou tři hlavní chyby specifické pro cluster DB4, ke čtvrté chybě dochází ve všech clusterech.

Logs
| where Timestamp >= datetime(2015-08-22 05:00) and Timestamp < datetime(2015-08-22 06:00)
| where Level == "e" and Service == "Inferences.UnusualEvents_Main"
| evaluate autocluster()
Počet Procento (%) Součást Cluster Zpráva
7125 26.64 InferenceHostService DB4 ExecuteAlgorithmMethod pro metodu...
7125 26.64 Neznámá komponenta DB4 Volání inferenceHostService se nezdařilo...
7124 26.64 OdvozováníAlgorithmExecutor DB4 Neočekávaná chyba systému odvození...
5112 19.11 OdvozováníAlgorithmExecutor * Neočekávaná chyba systému odvození...

Mapování hodnot z jedné sady na jinou

Běžným případem použití dotazu je statické mapování hodnot. Statické mapování může přispět k tomu, aby byly výsledky lépe prezentovatelné.

Například v další tabulce DeviceModel určuje model zařízení. Použití modelu zařízení není vhodná forma odkazování na název zařízení. 

DeviceModel Počet
iPhone5,1 32
iPhone3,2 432
iPhone7,2 55
iPhone5,2 66

 Použití popisného názvu je pohodlnější:

Friendlyname Počet
iPhone 5 32
iPhone 4 432
iPhone 6 55
iPhone5 66

Následující dva příklady ukazují, jak přejít z používání modelu zařízení na popisný název k identifikaci zařízení. 

Mapování pomocí dynamického slovníku

Mapování můžete dosáhnout pomocí dynamického slovníku a dynamických přístupových objektů. Příklad:

// Dataset definition
let Source = datatable(DeviceModel:string, Count:long)
[
  'iPhone5,1', 32,
  'iPhone3,2', 432,
  'iPhone7,2', 55,
  'iPhone5,2', 66,
];
// Query start here
let phone_mapping = dynamic(
  {
    "iPhone5,1" : "iPhone 5",
    "iPhone3,2" : "iPhone 4",
    "iPhone7,2" : "iPhone 6",
    "iPhone5,2" : "iPhone5"
  });
Source
| project FriendlyName = phone_mapping[DeviceModel], Count
Friendlyname Počet
iPhone 5 32
iPhone 4 432
iPhone 6 55
iPhone5 66

Mapování pomocí statické tabulky

Mapování můžete také dosáhnout pomocí trvalé tabulky a operátoru join .

  1. Tabulku mapování vytvořte jenom jednou:

    .create table Devices (DeviceModel: string, FriendlyName: string)
    
    .ingest inline into table Devices
        ["iPhone5,1","iPhone 5"]["iPhone3,2","iPhone 4"]["iPhone7,2","iPhone 6"]["iPhone5,2","iPhone5"]
    
  2. Vytvořte tabulku obsahu zařízení:

    DeviceModel Friendlyname
    iPhone5,1 iPhone 5
    iPhone3,2 iPhone 4
    iPhone7,2 iPhone 6
    iPhone5,2 iPhone5
  3. Vytvoření zdroje testovací tabulky:

    .create table Source (DeviceModel: string, Count: int)
    
    .ingest inline into table Source ["iPhone5,1",32]["iPhone3,2",432]["iPhone7,2",55]["iPhone5,2",66]
    
  4. Připojte tabulky a spusťte projekt:

    Devices
    | join (Source) on DeviceModel
    | project FriendlyName, Count
    

Tady je výstup:

Friendlyname Počet
iPhone 5 32
iPhone 4 432
iPhone 6 55
iPhone5 66

Vytváření a používání tabulek dimenzí v čase dotazu

Často budete chtít spojit výsledky dotazu s tabulkou dimenzí ad hoc, která není uložená v databázi. Můžete definovat výraz, jehož výsledkem je tabulka vymezená na jeden dotaz.

Příklad:

// Create a query-time dimension table using datatable
let DimTable = datatable(EventType:string, Code:string)
  [
    "Heavy Rain", "HR",
    "Tornado",    "T"
  ]
;
DimTable
| join StormEvents on EventType
| summarize count() by Code

Tady je trochu složitější příklad:

// Create a query-time dimension table using datatable
let TeamFoundationJobResult = datatable(Result:int, ResultString:string)
  [
    -1, 'None', 0, 'Succeeded', 1, 'PartiallySucceeded', 2, 'Failed',
    3, 'Stopped', 4, 'Killed', 5, 'Blocked', 6, 'ExtensionNotFound',
    7, 'Inactive', 8, 'Disabled', 9, 'JobInitializationError'
  ]
;
JobHistory
  | where PreciseTimeStamp > ago(1h)
  | where Service  != "AX"
  | where Plugin has "Analytics"
  | sort by PreciseTimeStamp desc
  | join kind=leftouter TeamFoundationJobResult on Result
  | extend ExecutionTimeSpan = totimespan(ExecutionTime)
  | project JobName, StartTime, ExecutionTimeSpan, ResultString, ResultMessage

Načtení nejnovějších záznamů (podle časového razítka) pro každou identitu

Předpokládejme, že máte tabulku, která obsahuje:

  • Sloupec ID , který identifikuje entitu, ke které je každý řádek přidružený, například ID uživatele nebo ID uzlu
  • Sloupec timestamp , který poskytuje časový odkaz na řádek.
  • Jiné sloupce

Pomocí operátoru s horním vnořením můžete vytvořit dotaz, který vrátí poslední dva záznamy pro každou hodnotu ID sloupce, kde je nejnovější definován jako s nejvyšší hodnotou timestamp:

datatable(id:string, timestamp:datetime, bla:string)           // #1
  [
  "Barak",  datetime(2015-01-01), "1",
  "Barak",  datetime(2016-01-01), "2",
  "Barak",  datetime(2017-01-20), "3",
  "Donald", datetime(2017-01-20), "4",
  "Donald", datetime(2017-01-18), "5",
  "Donald", datetime(2017-01-19), "6"
  ]
| top-nested   of id        by dummy0=max(1),                  // #2
  top-nested 2 of timestamp by dummy1=max(timestamp),          // #3
  top-nested   of bla       by dummy2=max(1)                   // #4
| project-away dummy0, dummy1, dummy2                          // #5

Tady je podrobné vysvětlení předchozího dotazu (číslování odkazuje na čísla v komentářích ke kódu):

  1. Je datatable způsob, jak vytvořit testovací data pro demonstrační účely. Za normálních okolností byste tady použili skutečná data.
  2. Tento řádek v podstatě znamená, že vrátí všechny jedinečné hodnoty .id
  3. Tento řádek pak vrátí pro dva hlavní záznamy, které maximalizují:
    • Sloupec timestamp
    • Sloupce předchozí úrovně (tady, jen id)
    • Sloupec zadaný na této úrovni (zde) timestamp
  4. Tento řádek sečte hodnoty bla sloupce pro každý záznam vrácený předchozí úrovní. Pokud tabulka obsahuje další sloupce, které vás zajímají, můžete tento řádek zopakovat pro každý z těchto sloupců.
  5. Poslední řádek pomocí operátoru project-away odebere sloupce "extra", které jsou zavedeny nástrojem top-nested.

Rozšíření tabulky o procento z celkového výpočtu

Tabulkový výraz, který obsahuje číselný sloupec, je pro uživatele užitečnější, když je doprovázen jeho hodnotou v procentech z celkového součtu.

Předpokládejme například, že dotaz vytvoří následující tabulku:

SomeSeries SomeInt
Apple 100
Banán 200

Tabulku chcete zobrazit takto:

SomeSeries SomeInt Pct
Apple 100 33.3
Banán 200 66.6

Pokud chcete změnit způsob zobrazení tabulky, vypočítejte součet (součet) SomeInt sloupce a potom vydělte každou hodnotu tohoto sloupce součtem. Pro libovolné výsledky použijte operátor as.

Příklad:

// The following table literally represents a long calculation
// that ends up with an anonymous tabular value:
datatable (SomeInt:int, SomeSeries:string) [
  100, "Apple",
  200, "Banana",
]
// We now give this calculation a name ("X"):
| as X
// Having this name we can refer to it in the sub-expression
// "X | summarize sum(SomeInt)":
| extend Pct = 100 * bin(todouble(SomeInt) / toscalar(X | summarize sum(SomeInt)), 0.001)

Provádění agregací přes posuvné okno

Následující příklad ukazuje, jak shrnout sloupce pomocí posuvného okna. Pro dotaz použijte následující tabulku, která obsahuje ceny ovoce podle časových razítek.

Pomocí posuvného okna sedmi dnů vypočítejte minimální, maximální a celkové náklady každého ovoce za den. Každý záznam v sadě výsledků agreguje předchozích sedm dnů a výsledky obsahují záznam za den v období analýzy.

Tabulka ovoce:

Timestamp Ovoce Cena
2018-09-24 21:00:00.0000000 Bananas 3
2018-09-25 20:00:00.0000000 Jablka 9
2018-09-26 03:00:00.0000000 Bananas 4
2018-09-27 10:00:00.0000000 Švestky 8
2018-09-28 07:00:00.0000000 Bananas 6
2018-09-29 21:00:00.0000000 Bananas 8
2018-09-30 01:00:00.0000000 Švestky 2
2018-10-01 05:00:00.0000000 Bananas 0
2018-10-02 02:00:00.0000000 Bananas 0
2018-10-03 13:00:00.0000000 Švestky 4
2018-10-04 14:00:00.0000000 Jablka 8
2018-10-05 05:00:00.0000000 Bananas 2
2018-10-06 08:00:00.0000000 Švestky 8
2018-10-07 12:00:00.0000000 Bananas 0

Tady je agregační dotaz posuvného okna. Podívejte se na vysvětlení za výsledkem dotazu.

let _start = datetime(2018-09-24);
let _end = _start + 13d;
Fruits
| extend _bin = bin_at(Timestamp, 1d, _start) // #1
| extend _endRange = iif(_bin + 7d > _end, _end,
                            iff( _bin + 7d - 1d < _start, _start,
                                iff( _bin + 7d - 1d < _bin, _bin,  _bin + 7d - 1d)))  // #2
| extend _range = range(_bin, _endRange, 1d) // #3
| mv-expand _range to typeof(datetime) limit 1000000 // #4
| summarize min(Price), max(Price), sum(Price) by Timestamp=bin_at(_range, 1d, _start) ,  Fruit // #5
| where Timestamp >= _start + 7d; // #6

Tady je výstup:

Timestamp Ovoce min_Price max_Price sum_Price
2018-10-01 00:00:00.0000000 Jablka 9 9 9
2018-10-01 00:00:00.0000000 Bananas 0 8 18
2018-10-01 00:00:00.0000000 Švestky 2 8 10
2018-10-02 00:00:00.0000000 Bananas 0 8 18
2018-10-02 00:00:00.0000000 Švestky 2 8 10
2018-10-03 00:00:00.0000000 Švestky 2 8 14
2018-10-03 00:00:00.0000000 Bananas 0 8 14
2018-10-04 00:00:00.0000000 Bananas 0 8 14
2018-10-04 00:00:00.0000000 Švestky 2 4 6
2018-10-04 00:00:00.0000000 Jablka 8 8 8
2018-10-05 00:00:00.0000000 Bananas 0 8 10
2018-10-05 00:00:00.0000000 Švestky 2 4 6
2018-10-05 00:00:00.0000000 Jablka 8 8 8
2018-10-06 00:00:00.0000000 Švestky 2 8 14
2018-10-06 00:00:00.0000000 Bananas 0 2 2
2018-10-06 00:00:00.0000000 Jablka 8 8 8
2018-10-07 00:00:00.0000000 Bananas 0 2 2
2018-10-07 00:00:00.0000000 Švestky 4 8 12
2018-10-07 00:00:00.0000000 Jablka 8 8 8

Dotaz "roztáhne" (duplikuje) každý záznam ve vstupní tabulce po dobu sedmi dnů od jeho skutečného výskytu. Každý záznam se ve skutečnosti zobrazí sedmkrát. V důsledku toho denní agregace zahrnuje všechny záznamy z předchozích sedmi dnů.

Tady je podrobné vysvětlení předchozího dotazu:

  1. Interval každého záznamu na jeden den (vzhledem k _start).
  2. Určete konec rozsahu pro každý záznam: _bin + 7d, pokud hodnota není mimo rozsah _start a _end, v takovém případě je upravena.
  3. Pro každý záznam vytvořte pole sedmi dnů (časová razítka) počínaje dnem aktuálního záznamu.
  4. mv-expand pole, čímž se každý záznam duplikuje na sedm záznamů, jeden den od sebe.
  5. Proveďte agregační funkci pro každý den. Vzhledem k č. 4 tento krok ve skutečnosti shrnuje posledních sedm dnů.
  6. Data za prvních sedm dnů nejsou úplná, protože pro prvních sedm dnů neexistuje žádné sedmidenní období zpětného vyhledávání. Prvních sedm dnů je vyloučeno z konečného výsledku. V příkladu se účastní pouze agregace pro 1.10.2018.

Vyhledání předchozí události

Další příklad ukazuje, jak najít předchozí událost mezi dvěma datovými sadami.

Máte dvě datové sady, A a B. Pro každý záznam v datové sadě B vyhledejte jeho předchozí událost v datové sadě A (to znamená záznam v A, arg_max který je stále starší než B).

Tady jsou ukázkové datové sady:

let A = datatable(Timestamp:datetime, ID:string, EventA:string)
[
    datetime(2019-01-01 00:00:00), "x", "Ax1",
    datetime(2019-01-01 00:00:01), "x", "Ax2",
    datetime(2019-01-01 00:00:02), "y", "Ay1",
    datetime(2019-01-01 00:00:05), "y", "Ay2",
    datetime(2019-01-01 00:00:00), "z", "Az1"
];
let B = datatable(Timestamp:datetime, ID:string, EventB:string)
[
    datetime(2019-01-01 00:00:03), "x", "B",
    datetime(2019-01-01 00:00:04), "x", "B",
    datetime(2019-01-01 00:00:04), "y", "B",
    datetime(2019-01-01 00:02:00), "z", "B"
];
A; B
Timestamp ID EventB
2019-01-01 00:00:00.0000000 x Ax1
2019-01-01 00:00:00.0000000 z Az1
2019-01-01 00:00:01.0000000 x Ax2
2019-01-01 00:00:02.0000000 y Ay1
2019-01-01 00:00:05.0000000 y Ay2

Timestamp ID EventA
2019-01-01 00:00:03.0000000 x B
2019-01-01 00:00:04.0000000 x B
2019-01-01 00:00:04.0000000 y B
2019-01-01 00:02:00.0000000 z B

Očekávaný výstup:

ID Timestamp Událost B A_Timestamp EventA
x 2019-01-01 00:00:03.0000000 B 2019-01-01 00:00:01.0000000 Ax2
x 2019-01-01 00:00:04.0000000 B 2019-01-01 00:00:01.0000000 Ax2
y 2019-01-01 00:00:04.0000000 B 2019-01-01 00:00:02.0000000 Ay1
z 2019-01-01 00:02:00.0000000 B 2019-01-01 00:00:00.0000000 Az1

K tomuto problému doporučujeme dva různé přístupy. Obě sady můžete otestovat na konkrétní datové sadě a najít tu, která je pro váš scénář nejvhodnější.

Poznámka

Každý přístup může u různých datových sad běžet jinak.

Přístup 1

Tento přístup serializuje obě datové sady podle ID a časového razítka. Potom seskupí všechny události v datové sadě B se všemi předchozími událostmi v datové sadě A. Nakonec vybere arg_max ze všech událostí v datové sadě A ve skupině.

A
| extend A_Timestamp = Timestamp, Kind="A"
| union (B | extend B_Timestamp = Timestamp, Kind="B")
| order by ID, Timestamp asc
| extend t = iff(Kind == "A" and (prev(Kind) != "A" or prev(Id) != ID), 1, 0)
| extend t = row_cumsum(t)
| summarize Timestamp=make_list(Timestamp), EventB=make_list(EventB), arg_max(A_Timestamp, EventA) by t, ID
| mv-expand Timestamp to typeof(datetime), EventB to typeof(string)
| where isnotempty(EventB)
| project-away t

Přístup 2

Tento přístup k řešení problému vyžaduje maximální dobu zpětného vyhledávání. Tento přístup sleduje, o kolik starší může být záznam v datové sadě A ve srovnání s datovou sadou B. Metoda pak tyto dvě datové sady spojí na základě ID a tohoto období zpětného vyhledávání.

Vytvoří join všechny možné kandidáty, všechny záznamy datové sady A, které jsou starší než záznamy v datové sadě B a v rámci období zpětného vyhledávání. Potom se vyfiltruje nejbližší datová sada B podle arg_min (TimestampB - TimestampA). Čím kratší je doba zpětného vyhledávání, tím lepší budou výsledky dotazu.

V následujícím příkladu je období zpětného vyhledávání nastavené na 1mhodnotu . Záznam s ID z nemá odpovídající A událost, protože jeho A událost je starší o dvě minuty.

let _maxLookbackPeriod = 1m;
let _internalWindowBin = _maxLookbackPeriod / 2;
let B_events = B
    | extend ID = new_guid()
    | extend _time = bin(Timestamp, _internalWindowBin)
    | extend _range = range(_time - _internalWindowBin, _time + _maxLookbackPeriod, _internalWindowBin)
    | mv-expand _range to typeof(datetime)
    | extend B_Timestamp = Timestamp, _range;
let A_events = A
    | extend _time = bin(Timestamp, _internalWindowBin)
    | extend _range = range(_time - _internalWindowBin, _time + _maxLookbackPeriod, _internalWindowBin)
    | mv-expand _range to typeof(datetime)
    | extend A_Timestamp = Timestamp, _range;
B_events
    | join kind=leftouter (
        A_events
) on ID, _range
| where isnull(A_Timestamp) or (A_Timestamp <= B_Timestamp and B_Timestamp <= A_Timestamp + _maxLookbackPeriod)
| extend diff = coalesce(B_Timestamp - A_Timestamp, _maxLookbackPeriod*2)
| summarize arg_min(diff, *) by ID
| project ID, B_Timestamp, A_Timestamp, EventB, EventA
ID B_Timestamp A_Timestamp Událost B EventA
x 2019-01-01 00:00:03.0000000 2019-01-01 00:00:01.0000000 B Ax2
x 2019-01-01 00:00:04.0000000 2019-01-01 00:00:01.0000000 B Ax2
y 2019-01-01 00:00:04.0000000 2019-01-01 00:00:02.0000000 B Ay1
z 2019-01-01 00:02:00.0000000 B

Další kroky

Tento článek popisuje běžné potřeby dotazů ve službě Azure Monitor a popisuje, jak můžete dotazovací jazyk Kusto použít k jejich splnění.

Operace s řetězci

Následující části obsahují příklady práce s řetězci při použití dotazovací jazyk Kusto.

Řetězce a jak je utéct do řídicího znaku

Řetězcové hodnoty jsou zabalené jednoduchými nebo dvojitými uvozovkami. Nalevo od znaku přidejte zpětné lomítko (\) pro řídicí znak: \t pro tabulátor, \n pro nový řádek, \' pro znak jednoduché uvozovky a \" pro znak dvojité uvozovky.

print "this is a 'string' literal in double \" quotes"
print 'this is a "string" literal in single \' quotes'

Pokud chcete zabránit tomu, aby znak "\" působil jako řídicí znak, přidejte k řetězci @jako předponu:

print @"C:\backslash\not\escaped\with @ prefix"

Porovnání řetězců

Operátor Popis Case-sensitive Příklad (výnosy true)
== Je rovno Ano "aBc" == "aBc"
!= Nerovná se Ano "abc" != "ABC"
=~ Je rovno Ne "abc" =~ "ABC"
!~ Nerovná se Ne "aBc" !~ "xyz"
has Hodnota na pravé straně je celý termín v hodnotě na levé straně. Ne "North America" has "america"
!has Hodnota na pravé straně není úplný termín v hodnotě na levé straně. Ne "North America" !has "amer"
has_cs Hodnota na pravé straně je celý termín v hodnotě na levé straně. Ano "North America" has_cs "America"
!has_cs Hodnota na pravé straně není úplný termín v hodnotě na levé straně. Ano "North America" !has_cs "amer"
hasprefix Hodnota na pravé straně je předpona termínu v hodnotě na levé straně. Ne "North America" hasprefix "ame"
!hasprefix Hodnota na pravé straně není předponou termínu v hodnotě na levé straně. Ne "North America" !hasprefix "mer"
hasprefix_cs Hodnota na pravé straně je předpona termínu v hodnotě na levé straně. Ano "North America" hasprefix_cs "Ame"
!hasprefix_cs Hodnota na pravé straně není předponou termínu v hodnotě na levé straně. Ano "North America" !hasprefix_cs "CA"
hassuffix Hodnota na pravé straně je přípona termínu v hodnotě na levé straně. Ne "North America" hassuffix "ica"
!hassuffix Hodnota na pravé straně není přípona výrazu v hodnotě na levé straně. Ne "North America" !hassuffix "americ"
hassuffix_cs Hodnota na pravé straně je přípona termínu v hodnotě na levé straně. Ano "North America" hassuffix_cs "ica"
!hassuffix_cs Hodnota na pravé straně není přípona výrazu v hodnotě na levé straně. Ano "North America" !hassuffix_cs "icA"
contains Hodnota na pravé straně se vyskytuje jako podsekvence hodnoty na levé straně. Ne "FabriKam" contains "BRik"
!contains Hodnota na pravé straně se v hodnotě na levé straně nenachází Ne "Fabrikam" !contains "xyz"
contains_cs Hodnota na pravé straně se vyskytuje jako podsekvence hodnoty na levé straně. Ano "FabriKam" contains_cs "Kam"
!contains_cs Hodnota na pravé straně se v hodnotě na levé straně nenachází Ano "Fabrikam" !contains_cs "Kam"
startswith Hodnota na pravé straně je počáteční podsekvence hodnoty na levé straně. Ne "Fabrikam" startswith "fab"
!startswith Hodnota na pravé straně není počáteční podsekvence hodnoty na levé straně. Ne "Fabrikam" !startswith "kam"
startswith_cs Hodnota na pravé straně je počáteční podsekvence hodnoty na levé straně. Ano "Fabrikam" startswith_cs "Fab"
!startswith_cs Hodnota na pravé straně není počáteční podsekvence hodnoty na levé straně. Ano "Fabrikam" !startswith_cs "fab"
endswith Hodnota na pravé straně je uzavírací podsekvence hodnoty na levé straně. Ne "Fabrikam" endswith "Kam"
!endswith Hodnota na pravé straně není uzavírací podsekvence hodnoty na levé straně. Ne "Fabrikam" !endswith "brik"
endswith_cs Hodnota na pravé straně je uzavírací podsekvence hodnoty na levé straně. Ano "Fabrikam" endswith "Kam"
!endswith_cs Hodnota na pravé straně není uzavírací podsekvence hodnoty na levé straně. Ano "Fabrikam" !endswith "brik"
matches regex Hodnota na levé straně obsahuje shodu s hodnotou na pravé straně. Ano "Fabrikam" matches regex "b.*k"
in Rovná se jednomu z prvků Ano "abc" in ("123", "345", "abc")
!in Nerovná se žádnému z prvků. Ano "bca" !in ("123", "345", "abc")

countof

Spočítá výskyty podřetězce v řetězci. Může odpovídat prostým řetězcům nebo použít regulární výraz (regex). Prosté shody řetězců se můžou překrývat, ale shody regulárních výrazů se nepřekrývají.

countof(text, search [, kind])
  • text: Vstupní řetězec
  • search: Prostý řetězec nebo regulární výraz, který se má shodovat uvnitř textu
  • kind: normální | regulární výraz (výchozí hodnota: normální).

Vrátí počet, kolikrát může být hledaný řetězec v kontejneru spárován. Prosté shody řetězců se můžou překrývat, ale shody regulárních výrazů se nepřekrývají.

Prosté shody řetězců

print countof("The cat sat on the mat", "at");  //result: 3
print countof("aaa", "a");  //result: 3
print countof("aaaa", "aa");  //result: 3 (not 2!)
print countof("ababa", "ab", "normal");  //result: 2
print countof("ababa", "aba");  //result: 2

Regulární výrazy odpovídají

print countof("The cat sat on the mat", @"\b.at\b", "regex");  //result: 3
print countof("ababa", "aba", "regex");  //result: 1
print countof("abcabc", "a.c", "regex");  // result: 2

Extrahovat

Získá shodu pro regulární výraz z konkrétního řetězce. Volitelně může převést extrahovaný podřetěžec na zadaný typ.

extract(regex, captureGroup, text [, typeLiteral])
  • regex: Regulární výraz.
  • captureGroup: Kladná celočíselná konstanta, která označuje skupinu zachycení, která se má extrahovat. Použijte 0 pro celou shodu, 1 pro hodnotu odpovídající první závorce () v regulárním výrazu a 2 nebo více pro následné závorky.
  • text - Řetězec, který chcete vyhledat.
  • typeLiteral – Volitelný typ literálu (například typeof(long)). Pokud je zadaný, extrahovaný podřetěžce je převeden na tento typ.

Vrátí podřetěc odpovídající zadané skupině captureGroupzachycení , volitelně převedený na typeLiteral. Pokud neexistuje žádná shoda nebo se převod typu nezdaří, vrátí hodnotu null.

Následující příklad extrahuje poslední oktet ze záznamu prezenčního ComputerIP signálu:

Heartbeat
| where ComputerIP != ""
| take 1
| project ComputerIP, last_octet=extract("([0-9]*$)", 1, ComputerIP)

Následující příklad extrahuje poslední oktet, přetypuje ho na skutečný typ (číslo) a pak vypočítá další hodnotu IP adresy:

Heartbeat
| where ComputerIP != ""
| take 1
| extend last_octet=extract("([0-9]*$)", 1, ComputerIP, typeof(real))
| extend next_ip=(last_octet+1)%255
| project ComputerIP, last_octet, next_ip

V dalším příkladu se v řetězci Trace vyhledá definice Duration. Shoda se přetypuje na real a vynásobí se časovou konstantou (1 s), která se pak přetypuje Duration na typ timespan.

let Trace="A=12, B=34, Duration=567, ...";
print Duration = extract("Duration=([0-9.]+)", 1, Trace, typeof(real));  //result: 567
print Duration_seconds =  extract("Duration=([0-9.]+)", 1, Trace, typeof(real)) * time(1s);  //result: 00:09:27

isempty, isnotempty

  • isempty vrátí hodnotu true , pokud je argument prázdný řetězec nebo null (viz isnull).
  • isnotempty vrátí hodnotu , true pokud argument není prázdný řetězec nebo hodnota null (viz isnotnull). Alias: isnotempty.
isempty(value)
isnotempty(value)

Příklad

print isempty("");  // result: true

print isempty("0");  // result: false

print isempty(0);  // result: false

print isempty(5);  // result: false

Heartbeat | where isnotempty(ComputerIP) | take 1  // return 1 Heartbeat record in which ComputerIP isn't empty

parse_url

Rozdělí adresu URL na její části, jako je protokol, hostitel a port, a pak vrátí objekt slovníku, který obsahuje části jako řetězce.

parse_url(urlstring)

Příklad

print parse_url("http://user:pass@contoso.com/icecream/buy.aspx?a=1&b=2#tag")

Tady je výstup:

{
  "Scheme" : "http",
  "Host" : "contoso.com",
  "Port" : "80",
  "Path" : "/icecream/buy.aspx",
  "Username" : "user",
  "Password" : "pass",
  "Query Parameters" : {"a":"1","b":"2"},
  "Fragment" : "tag"
}

replace_regex

Nahradí všechny shody regulárního výrazu jiným řetězcem.

replace_regex(regex, rewrite, input_text)
  • regex: Regulární výraz, který se má shodovat. Může obsahovat skupiny zachycení v závorkách ().
  • rewrite: Náhradní regulární výraz pro každou shodu provedenou porovnáním regulárního výrazu. Pomocí \0 můžete odkazovat na celou shodu, \1 pro první skupinu zachycení, \2 atd. pro další skupiny zachycení.
  • input_text: Vstupní řetězec, ve který se má hledat.

Vrátí text po nahrazení všech shod regulárních výrazů vyhodnocením přepsání. Shody se nepřekrývají.

Příklad

SecurityEvent
| take 1
| project Activity
| extend replaced = replace_regex(@"(\d+) -", @"Activity ID \1: ", Activity)

Tady je výstup:

Aktivita Nahrazen
4663 – Došlo k pokusu o přístup k objektu. ID aktivity 4663: Došlo k pokusu o přístup k objektu.

Rozdělit

Rozdělí určitý řetězec podle zadaného oddělovače a pak vrátí pole výsledných podřetězců.

split(source, delimiter [, requestedIndex])
  • source: Řetězec, který se má rozdělit podle zadaného oddělovače.
  • delimiter: Oddělovač, který se použije k rozdělení zdrojového řetězce.
  • requestedIndex: Volitelný index založený na nule. Pokud je zadaný, vrácené pole řetězců obsahuje pouze tuto položku (pokud existuje).

Příklad

print split("aaa_bbb_ccc", "_");    // result: ["aaa","bbb","ccc"]
print split("aa_bb", "_");          // result: ["aa","bb"]
print split("aaa_bbb_ccc", "_", 1);	// result: ["bbb"]
print split("", "_");              	// result: [""]
print split("a__b", "_");           // result: ["a","","b"]
print split("aabbcc", "bb");        // result: ["aa","cc"]

strcat

Zřetězí řetězcové argumenty (podporuje 1–16 argumentů).

strcat("string1", "string2", "string3")

Příklad

print strcat("hello", " ", "world")	// result: "hello world"

strlen

Vrátí délku řetězce.

strlen("text_to_evaluate")

Příklad

print strlen("hello")	// result: 5

Podřetězec

Extrahuje podřetězc z konkrétního zdrojového řetězce, počínaje zadaným indexem. Volitelně můžete zadat délku požadovaného podřetěžce.

substring(source, startingIndex [, length])
  • source: Zdrojový řetězec, ze kterého je podřetězc převzat.
  • startingIndex: Pozice počátečního znaku požadovaného podřetětce založeného na nule.
  • length: Volitelný parametr, který můžete použít k určení požadované délky vráceného podřetěžce.

Příklad

print substring("abcdefg", 1, 2);	// result: "bc"
print substring("123456", 1);		// result: "23456"
print substring("123456", 2, 2);	// result: "34"
print substring("ABCD", 0, 2);	// result: "AB"

tolower, toupper

Převede určitý řetězec na všechna malá nebo všechna velká písmena.

tolower("value")
toupper("value")

Příklad

print tolower("HELLO");	// result: "hello"
print toupper("hello");	// result: "HELLO"

Operace s datem a časem

Následující části obsahují příklady práce s hodnotami data a času při použití dotazovací jazyk Kusto.

Základy data a času

Dotazovací jazyk Kusto má dva hlavní datové typy přidružené k datům a časům: datetime a timespan. Všechna data jsou vyjádřena v UTC. I když je podporováno více formátů data a času, preferuje se formát ISO-8601.

Časové intervaly jsou vyjádřeny jako desetinná čárka následovaná časovou jednotkou:

Zkratka Časová jednotka
d day
h hour
m minute
s second
Paní Milisekund
mikrosekundy mikrosekundy
Zaškrtněte Nanosecond

Hodnoty data a času můžete vytvořit přetypováním řetězce pomocí operátoru todatetime . Pokud například chcete zkontrolovat prezentní signály virtuálních počítačů odesílané v určitém časovém rámci, zadejte časový rozsah pomocí operátoru between :

Heartbeat
| where TimeGenerated between(datetime("2018-06-30 22:46:42") .. datetime("2018-07-01 00:57:27"))

Dalším běžným scénářem je porovnání hodnoty data a času se současnou hodnotou. Pokud například chcete zobrazit všechny prezentní signály za poslední dvě minuty, můžete použít now operátor společně s časovým rozsahem, který představuje dvě minuty:

Heartbeat
| where TimeGenerated > now() - 2m

Zástupce je k dispozici také pro tuto funkci:

Heartbeat
| where TimeGenerated > now(-2m)

Nejkratší a nejčtenější metoda používá ago operátor:

Heartbeat
| where TimeGenerated > ago(2m)

Předpokládejme, že místo toho, abyste znali čas začátku a konce, znáte čas zahájení a dobu trvání. Dotaz můžete přepsat:

let startDatetime = todatetime("2018-06-30 20:12:42.9");
let duration = totimespan(25m);
Heartbeat
| where TimeGenerated between(startDatetime .. (startDatetime+duration) )
| extend timeFromStart = TimeGenerated - startDatetime

Převod časových jednotek

Můžete chtít vyjádřit hodnotu data a času nebo časového rozsahu v jiné než výchozí časové jednotce. Pokud například kontrolujete chybové události za posledních 30 minut a potřebujete počítaný sloupec, který ukazuje, jak dlouho k události došlo, můžete použít tento dotaz:

Event
| where TimeGenerated > ago(30m)
| where EventLevelName == "Error"
| extend timeAgo = now() - TimeGenerated

Sloupec timeAgo obsahuje hodnoty jako 00:09:31.5118992, které jsou naformátované jako hh:mm:ss.fffffff. Pokud chcete tyto hodnoty naformátovat na number počet minut od počátečního času, vydělte tuto hodnotu hodnotou :1m

Event
| where TimeGenerated > ago(30m)
| where EventLevelName == "Error"
| extend timeAgo = now() - TimeGenerated
| extend timeAgoMinutes = timeAgo/1m

Agregace a dělení podle časových intervalů

Dalším běžným scénářem je potřeba získat statistiky pro konkrétní časové období v konkrétní časové jednotce. Pro tento scénář můžete jako součást summarize klauzule použít bin operátor.

Pomocí následujícího dotazu získáte počet událostí, ke kterým došlo každých pět minut během poslední půl hodiny:

Event
| where TimeGenerated > ago(30m)
| summarize events_count=count() by bin(TimeGenerated, 5m)

Tento dotaz vytvoří následující tabulku:

TimeGenerated(UTC) events_count
2018-08-01T09:30:00 54
2018-08-01T09:35:00 41
2018-08-01T09:40:00 42
2018-08-01T09:45:00 41
2018-08-01T09:50:00 41
2018-08-01T09:55:00 16

Další způsob, jak vytvořit kontejnery výsledků, je použít funkce jako startofday:

Event
| where TimeGenerated > ago(4d)
| summarize events_count=count() by startofday(TimeGenerated)

Tady je výstup:

časové razítko Počet_
2018-07-28T00:00:00 7,136
2018-07-29T00:00:00 12,315
2018-07-30T00:00:00 16,847
2018-07-31T00:00:00 12,616
2018-08-01T00:00:00 5,416

Časová pásma

Vzhledem k tomu, že všechny hodnoty data a času jsou vyjádřeny v utc, je často užitečné tyto hodnoty převést na místní časové pásmo. Pomocí tohoto výpočtu můžete například převést časy UTC na PST:

Event
| extend localTimestamp = TimeGenerated - 8h

Agregace

Následující části obsahují příklady agregace výsledků dotazu při použití dotazovací jazyk Kusto.

Počet

Spočítejte počet řádků v sadě výsledků po použití filtrů. Následující příklad vrátí celkový počet řádků v Perf tabulce za posledních 30 minut. Výsledky se vrátí ve sloupci s názvem, count_ pokud ke sloupci nepřiřadíte konkrétní název:

Perf
| where TimeGenerated > ago(30m)
| summarize count()
Perf
| where TimeGenerated > ago(30m)
| summarize num_of_records=count()

Vizualizace časového grafu může být užitečná k zobrazení trendu v průběhu času:

Perf
| where TimeGenerated > ago(30m)
| summarize count() by bin(TimeGenerated, 5m)
| render timechart

Výstup z tohoto příkladu ukazuje spojnici trendu Perf počtu záznamů v pětiminutových intervalech:

Snímek obrazovky spojnicového grafu, který zobrazuje spojnici trendu počtu záznamů výkonu v pětiminutových intervalech

dcount, dcountif

K počítání jedinečných hodnot v konkrétním sloupci použijte dcount a dcountif . Následující dotaz vyhodnotí, kolik různých počítačů odeslalo prezenčních signálů za poslední hodinu:

Heartbeat
| where TimeGenerated > ago(1h)
| summarize dcount(Computer)

Pokud chcete spočítat jenom počítače s Linuxem, které odesílaly prezenční signály, použijte dcountif:

Heartbeat
| where TimeGenerated > ago(1h)
| summarize dcountif(Computer, OSType=="Linux")

Vyhodnocení podskupin

Pokud chcete provést počet nebo jiné agregace u podskupin v datech, použijte by klíčové slovo . Pokud například chcete spočítat počet různých počítačů s Linuxem, které odeslaly prezenční signály v každé zemi nebo oblasti, použijte tento dotaz:

Heartbeat
| where TimeGenerated > ago(1h)
| summarize distinct_computers=dcountif(Computer, OSType=="Linux") by RemoteIPCountry
RemoteIPCountry distinct_computers
USA 19
Kanada 3
Irsko 0
Spojené království 0
Nizozemsko 2

Pokud chcete analyzovat ještě menší podskupiny dat, přidejte do oddílu by názvy sloupců. Můžete například chtít spočítat různé počítače z každé země nebo oblasti podle typu operačního systému (OSType):

Heartbeat
| where TimeGenerated > ago(1h)
| summarize distinct_computers=dcountif(Computer, OSType=="Linux") by RemoteIPCountry, OSType

Percentil

Pokud chcete najít medián, použijte percentile funkci s hodnotou k určení percentilu:

Perf
| where TimeGenerated > ago(30m)
| where CounterName == "% Processor Time" and InstanceName == "_Total"
| summarize percentiles(CounterValue, 50) by Computer

Můžete také zadat různé percentily, abyste získali agregovaný výsledek pro každý z nich:

Perf
| where TimeGenerated > ago(30m)
| where CounterName == "% Processor Time" and InstanceName == "_Total"
| summarize percentiles(CounterValue, 25, 50, 75, 90) by Computer

Z výsledků může být vidět, že některé počítačové procesory mají podobné hodnoty mediánu. I když jsou některé počítače kolem mediánu stabilní, jiné hlásí mnohem nižší a vyšší hodnoty procesoru. Vysoké a nízké hodnoty znamenají, že počítače zaznamenaly špičky.

Variance

K přímému vyhodnocení rozptylu hodnoty použijte metody směrodatné odchylky a rozptylu:

Perf
| where TimeGenerated > ago(30m)
| where CounterName == "% Processor Time" and InstanceName == "_Total"
| summarize stdev(CounterValue), variance(CounterValue) by Computer

Dobrý způsob, jak analyzovat stabilitu využití procesoru, je zkombinovat stdev s výpočtem mediánu:

Perf
| where TimeGenerated > ago(130m)
| where CounterName == "% Processor Time" and InstanceName == "_Total"
| summarize stdev(CounterValue), percentiles(CounterValue, 50) by Computer

Generování seznamů a sad

Můžete použít make_list k kontingenčnímu uspořádání dat podle pořadí hodnot v konkrétním sloupci. Můžete například chtít prozkoumat nejběžnější události objednávek, které probíhají na vašich počítačích. Data můžete v podstatě uspořádat podle pořadí EventID hodnot v jednotlivých počítačích:

Event
| where TimeGenerated > ago(12h)
| order by TimeGenerated desc
| summarize make_list(EventID) by Computer

Tady je výstup:

Počítač list_EventID
počítač 1 [704,701,1501,1500,1085,704,704,701]
počítač 2 [326,105,302,301,300,102]
... ...

make_list vygeneruje seznam v pořadí, v jakém do něj byla předána data. Pokud chcete řadit události od nejstaršího po nejnovější, použijte asc v order příkazu místo desc.

Může být užitečné vytvořit seznam pouze jedinečných hodnot. Tento seznam se nazývá sada a můžete ho make_set vygenerovat pomocí příkazu :

Event
| where TimeGenerated > ago(12h)
| order by TimeGenerated desc
| summarize make_set(EventID) by Computer

Tady je výstup:

Počítač list_EventID
počítač 1 [704,701,1501,1500,1085]
počítač 2 [326,105,302,301,300,102]
... ...

Podobně jako make_listi služba make_set funguje s seřazenými daty. Příkaz make_set generuje pole na základě pořadí řádků, které jsou do něj předány.

Rozbalit seznamy

Inverzní operace nebo make_listmake_set je mv-expand. Příkaz mv-expand rozbalí seznam hodnot a oddělí řádky. Může se rozšířit napříč libovolným počtem dynamických sloupců, včetně sloupců JSON a polí. V tabulce můžete například zkontrolovat Heartbeat řešení, která odesílala data z počítačů, které odeslaly prezenční signál v poslední hodině:

Heartbeat
| where TimeGenerated > ago(1h)
| project Computer, Solutions

Tady je výstup:

Počítač Řešení
počítač 1 "security", "updates", "changeTracking"
počítač 2 "zabezpečení", "aktualizace"
počítač3 "antiMalware", "changeTracking"
... ...

Umožňuje mv-expand zobrazit každou hodnotu v samostatném řádku místo v seznamu odděleném čárkami:

Heartbeat
| where TimeGenerated > ago(1h)
| project Computer, split(Solutions, ",")
| mv-expand Solutions

Tady je výstup:

Počítač Řešení
počítač 1 "zabezpečení"
počítač 1 "updates" (aktualizace)
počítač 1 "changeTracking"
počítač 2 "zabezpečení"
počítač 2 "updates" (aktualizace)
počítač3 "antiMalware"
počítač3 "changeTracking"
... ...

Pomocí příkazu make_list můžete položky seskupit. Ve výstupu se zobrazí seznam počítačů podle řešení:

Pomocí příkazu make_list můžete položky seskupit. Ve výstupu se zobrazí seznam počítačů podle řešení:

Heartbeat
| where TimeGenerated > ago(1h)
| project Computer, split(Solutions, ",")
| mv-expand Solutions
| summarize make_list(Computer) by tostring(Solutions) 

Tady je výstup:

Řešení list_Computer
"zabezpečení" ["počítač1", "počítač2"]
"updates" (aktualizace) ["počítač1", "počítač2"]
"changeTracking" ["počítač1", "počítač3"]
"antiMalware" ["počítač3"]
... ...

Chybějící intervaly

Užitečnou aplikací je mv-expand vyplnění výchozích hodnot pro chybějící intervaly. Předpokládejme například, že hledáte dobu provozu konkrétního počítače tím, že zkoumáte jeho prezenční signál. Chcete také zobrazit zdroj prezenčního signálu, který je ve sloupci Category . Normálně bychom použili základní summarize příkaz:

Heartbeat
| where TimeGenerated > ago(12h)
| summarize count() by Category, bin(TimeGenerated, 1h)

Tady je výstup:

Kategorie TimeGenerated Počet_
Přímý agent 2017-06-06T17:00:00Z 15
Přímý agent 2017-06-06T18:00:00Z 60
Přímý agent 2017-06-06T20:00:00Z 55
Přímý agent 2017-06-06T21:00:00Z 57
Přímý agent 2017-06-06T22:00:00Z 60
... ... ...

Ve výstupu chybí kbelík přidružený k "2017-06-06T19:00:00Z", protože pro tuto hodinu nejsou k dispozici žádná data prezenčního signálu. make-series Pomocí funkce můžete prázdným kontejnerům přiřadit výchozí hodnotu. Pro každou kategorii se vygeneruje řádek. Výstup obsahuje dva další maticové sloupce, jeden pro hodnoty a druhý pro odpovídající časové intervaly:

Heartbeat
| make-series count() default=0 on TimeGenerated in range(ago(1d), now(), 1h) by Category

Tady je výstup:

Kategorie Počet_ TimeGenerated
Přímý agent [15,60,0,55,60,57,60,...] ["2017-06-06T17:00:00.000000Z","2017-06-06T18:00:00.000000Z","2017-06-06T19:00:00.0000000Z","2017-06-06T20:00:00.0000000Z","2017-06-06T21:00:00.0000000Z",...]
... ... ...

Třetí prvek pole count_ je podle očekávání 0. Pole TimeGenerated má odpovídající časové razítko "2017-06-06T19:00:00.0000000Z". Tento formát pole je ale obtížně čitelný. Slouží mv-expand k rozšíření polí a vytvoření výstupu ve stejném formátu, jako má výstup vygenerovaný nástrojem summarize:

Heartbeat
| make-series count() default=0 on TimeGenerated in range(ago(1d), now(), 1h) by Category
| mv-expand TimeGenerated, count_
| project Category, TimeGenerated, count_

Tady je výstup:

Kategorie TimeGenerated Počet_
Přímý agent 2017-06-06T17:00:00Z 15
Přímý agent 2017-06-06T18:00:00Z 60
Přímý agent 2017-06-06T19:00:00Z 0
Přímý agent 2017-06-06T20:00:00Z 55
Přímý agent 2017-06-06T21:00:00Z 57
Přímý agent 2017-06-06T22:00:00Z 60
... ... ...

Zužte výsledky na sadu prvků: let, make_set, toscalar, in

Běžným scénářem je vybrat názvy konkrétních entit na základě sady kritérií a potom filtrovat jinou datovou sadu podle této sady entit. Můžete například najít počítače, o kterých je známo, že mají chybějící aktualizace, a identifikovat IP adresy, na které tyto počítače volají.

Tady je příklad:

let ComputersNeedingUpdate = toscalar(
    Update
    | summarize make_set(Computer)
    | project set_Computer
);
WindowsFirewall
| where Computer in (ComputersNeedingUpdate)

Spojení

Spojení můžete použít k analýze dat z více tabulek ve stejném dotazu. Spojení sloučí řádky dvou datových sad odpovídajícími hodnotami zadaných sloupců.

Tady je příklad:

SecurityEvent
| where EventID == 4624		// sign-in events
| project Computer, Account, TargetLogonId, LogonTime=TimeGenerated
| join kind= inner (
    SecurityEvent
    | where EventID == 4634		// sign-out events
    | project TargetLogonId, LogoffTime=TimeGenerated
) on TargetLogonId
| extend Duration = LogoffTime-LogonTime
| project-away TargetLogonId1
| top 10 by Duration desc

V tomto příkladu první datová sada vyfiltruje všechny události přihlášení. Tato datová sada je spojená s druhou datovou sadou, která filtruje všechny události odhlášení. Předpokládané sloupce jsou Computer, Account, TargetLogonIda TimeGenerated. Datové sady jsou korelovány sdíleným sloupcem TargetLogonId. Výstupem je jeden záznam na korelaci, který má čas přihlášení i odhlášení.

Pokud mají obě datové sady sloupce se stejným názvem, dostanou sloupce datové sady na pravé straně číslo indexu. V tomto příkladu by se výsledky zobrazily TargetLogonId s hodnotami z tabulky na levé straně a TargetLogonId1 s hodnotami z tabulky na pravé straně. V tomto případě byl druhý TargetLogonId1 sloupec odebrán pomocí operátoru .project-away

Poznámka

Pokud chcete zvýšit výkon, používejte operátor project pouze relevantní sloupce spojených datových sad.

Pomocí následující syntaxe můžete spojit dvě datové sady, ve kterých má spojený klíč jiný název mezi dvěma tabulkami:

Table1
| join ( Table2 )
on $left.key1 == $right.key2

Vyhledávací tabulky

Běžným použitím spojení je použití datatable pro mapování statických hodnot. Použití datatable může pomoct zajistit, aby byly výsledky lépe prezentovatelné. Data událostí zabezpečení můžete například rozšířit o název události pro každé ID události:

let DimTable = datatable(EventID:int, eventName:string)
  [
    4625, "Account activity",
    4688, "Process action",
    4634, "Account activity",
    4658, "The handle to an object was closed",
    4656, "A handle to an object was requested",
    4690, "An attempt was made to duplicate a handle to an object",
    4663, "An attempt was made to access an object",
    5061, "Cryptographic operation",
    5058, "Key file operation"
  ];
SecurityEvent
| join kind = inner
 DimTable on EventID
| summarize count() by eventName

Tady je výstup:

Eventname Počet_
Popisovač objektu byl zavřený. 290,995
Byl požadován popisovač objektu. 154,157
Došlo k pokusu o duplikování popisovače objektu. 144,305
Došlo k pokusu o přístup k objektu. 123,669
Kryptografická operace 153,495
Operace se souborem klíče 153,496

JSON a datové struktury

Vnořené objekty jsou objekty, které obsahují jiné objekty v poli nebo v mapě párů klíč-hodnota. Objekty jsou reprezentované jako řetězce JSON. Tato část popisuje, jak můžete pomocí formátu JSON načíst data a analyzovat vnořené objekty.

Práce s řetězci JSON

Slouží extractjson pro přístup ke konkrétnímu elementu JSON ve známé cestě. Tato funkce vyžaduje výraz path, který používá následující konvence:

  • Použijte $ odkaz na kořenovou složku.
  • Pomocí zápisu závorky nebo tečky můžete odkazovat na indexy a prvky, jak je znázorněno v následujících příkladech.

K oddělení prvků použijte závorky pro indexy a tečky:

let hosts_report='{"hosts": [{"location":"North_DC", "status":"running", "rate":5},{"location":"South_DC", "status":"stopped", "rate":3}]}';
print hosts_report
| extend status = extractjson("$.hosts[0].status", hosts_report)

Tento příklad je podobný, ale používá pouze zápis závorek:

let hosts_report='{"hosts": [{"location":"North_DC", "status":"running", "rate":5},{"location":"South_DC", "status":"stopped", "rate":3}]}';
print hosts_report
| extend status = extractjson("$['hosts'][0]['status']", hosts_report)

Pro pouze jeden prvek můžete použít pouze tečkový zápis:

let hosts_report=dynamic({"location":"North_DC", "status":"running", "rate":5});
print hosts_report
| extend status = hosts_report.status

parsejson

Nejjednodušší je přistupovat k více prvkům ve struktuře JSON jako k dynamickému objektu. Slouží parsejson k přetypování textových dat na dynamický objekt. Po převodu JSON na dynamický typ můžete k analýze dat použít další funkce.

let hosts_object = parsejson('{"hosts": [{"location":"North_DC", "status":"running", "rate":5},{"location":"South_DC", "status":"stopped", "rate":3}]}');
print hosts_object
| extend status0=hosts_object.hosts[0].status, rate1=hosts_object.hosts[1].rate

array_length

Slouží array_length k počítání počtu prvků v matici:

let hosts_object = parsejson('{"hosts": [{"location":"North_DC", "status":"running", "rate":5},{"location":"South_DC", "status":"stopped", "rate":3}]}');
print hosts_object
| extend hosts_num=array_length(hosts_object.hosts)

mv-expand

Slouží mv-expand k rozdělení vlastností objektu do samostatných řádků:

let hosts_object = parsejson('{"hosts": [{"location":"North_DC", "status":"running", "rate":5},{"location":"South_DC", "status":"stopped", "rate":3}]}');
print hosts_object
| mv-expand hosts_object.hosts[0]

Snímek obrazovky ukazuje hosts_0 s hodnotami pro polohu, stav a sazbu.

buildschema

Použijte buildschema k získání schématu, které přijímá všechny hodnoty objektu:

let hosts_object = parsejson('{"hosts": [{"location":"North_DC", "status":"running", "rate":5},{"location":"South_DC", "status":"stopped", "rate":3}]}');
print hosts_object
| summarize buildschema(hosts_object)

Výsledkem je schéma ve formátu JSON:

{
    "hosts":
    {
        "indexer":
        {
            "location": "string",
            "rate": "int",
            "status": "string"
        }
    }
}

Schéma popisuje názvy polí objektů a jejich odpovídající datové typy.

Vnořené objekty můžou mít různá schémata, jako v následujícím příkladu:

let hosts_object = parsejson('{"hosts": [{"location":"North_DC", "status":"running", "rate":5},{"status":"stopped", "rate":"3", "range":100}]}');
print hosts_object
| summarize buildschema(hosts_object)

Grafy

Následující části obsahují příklady práce s grafy při použití dotazovací jazyk Kusto.

Graf výsledků

Začněte kontrolou počtu počítačů na operační systém za poslední hodinu:

Heartbeat
| where TimeGenerated > ago(1h)
| summarize count(Computer) by OSType

Ve výchozím nastavení se výsledky zobrazují jako tabulka:

Snímek obrazovky s výsledky dotazu v tabulce

Pokud chcete zobrazit užitečnější zobrazení, vyberte Graf a pak výběrem možnosti Výsečový graf vizualizujete výsledky:

Snímek obrazovky znázorňující výsledky dotazu ve výsečovém grafu

Časové diagramy

Zobrazí průměr a 50. a 95. percentil času procesoru v intervalech jedné hodiny.

Následující dotaz vygeneruje více řad. Ve výsledcích můžete zvolit, která řada se má zobrazit v časovém diagramu.

Perf
| where TimeGenerated > ago(1d)
| where CounterName == "% Processor Time"
| summarize avg(CounterValue), percentiles(CounterValue, 50, 95)  by bin(TimeGenerated, 1h)

Vyberte možnost Zobrazení spojnicového grafu:

Snímek obrazovky znázorňující spojnicový graf s více řadami

Referenční čára

Referenční čára vám může pomoct snadno identifikovat, jestli metrika překročila určitou prahovou hodnotu. Pokud chcete do grafu přidat čáru, rozšiřte datovou sadu přidáním konstantního sloupce:

Perf
| where TimeGenerated > ago(1d)
| where CounterName == "% Processor Time"
| summarize avg(CounterValue), percentiles(CounterValue, 50, 95)  by bin(TimeGenerated, 1h)
| extend Threshold = 20

Snímek obrazovky znázorňující spojnicový graf s více řadami s referenční čárou prahové hodnoty

Více dimenzí

Více výrazů v klauzuli bysummarize vytvoří ve výsledcích více řádků. Pro každou kombinaci hodnot se vytvoří jeden řádek.

SecurityEvent
| where TimeGenerated > ago(1d)
| summarize count() by tostring(EventID), AccountType, bin(TimeGenerated, 1h)

Když zobrazíte výsledky jako graf, graf použije první sloupec z klauzule by . Následující příklad ukazuje skládaný sloupcový graf vytvořený pomocí EventID hodnoty. Rozměry musí být typu string . V tomto příkladu EventID je hodnota přetypovaná na string:

Snímek obrazovky znázorňující pruhový graf založený na sloupci EventID

Mezi sloupci můžete přepínat výběrem šipky rozevíracího seznamu pro název sloupce:

Snímek obrazovky znázorňující pruhový graf založený na sloupci AccountType se zobrazeným selektorem sloupců

Inteligentní analýza

Tato část obsahuje příklady použití funkcí inteligentní analýzy v Azure Log Analytics k analýze aktivit uživatelů. Tyto příklady můžete použít k analýze vlastních aplikací, které jsou monitorovány službou Aplikace Azure Insights, nebo použít koncepty v těchto dotazech k podobné analýze jiných dat.

Analýza kohort

Analýza kohorty sleduje aktivitu konkrétních skupin uživatelů, které se označují jako kohorty. Analýza kohorty se snaží měřit, jak je služba přitažlivá, a to měřením míry vracejících se uživatelů. Uživatelé jsou seskupené podle okamžiku, kdy službu poprvé použili. Při analýze kohort očekáváme, že během prvních sledovaných období zjistíme pokles aktivity. Každá kohorta má název podle týdne, kdy byli její členové poprvé pozorováni.

Následující příklad analyzuje počet aktivit, které uživatelé dokončili během pěti týdnů od prvního použití služby:

let startDate = startofweek(bin(datetime(2017-01-20T00:00:00Z), 1d));
let week = range Cohort from startDate to datetime(2017-03-01T00:00:00Z) step 7d;
// For each user, we find the first and last timestamp of activity
let FirstAndLastUserActivity = (end:datetime)
{
    customEvents
    | where customDimensions["sourceapp"]=="ai-loganalyticsui-prod"
    // Check 30 days back to see first time activity.
    | where timestamp > startDate - 30d
    | where timestamp < end
    | summarize min=min(timestamp), max=max(timestamp) by user_AuthenticatedId
};
let DistinctUsers = (cohortPeriod:datetime, evaluatePeriod:datetime) {
    toscalar (
    FirstAndLastUserActivity(evaluatePeriod)
    // Find members of the cohort: only users that were observed in this period for the first time.
    | where min >= cohortPeriod and min < cohortPeriod + 7d
    // Pick only the members that were active during the evaluated period or after.
    | where max > evaluatePeriod - 7d
    | summarize dcount(user_AuthenticatedId))
};
week
| where Cohort == startDate
// Finally, calculate the desired metric for each cohort. In this sample, we calculate distinct users but you can change
// this to any other metric that would measure the engagement of the cohort members.
| extend
    r0 = DistinctUsers(startDate, startDate+7d),
    r1 = DistinctUsers(startDate, startDate+14d),
    r2 = DistinctUsers(startDate, startDate+21d),
    r3 = DistinctUsers(startDate, startDate+28d),
    r4 = DistinctUsers(startDate, startDate+35d)
| union (week | where Cohort == startDate + 7d
| extend
    r0 = DistinctUsers(startDate+7d, startDate+14d),
    r1 = DistinctUsers(startDate+7d, startDate+21d),
    r2 = DistinctUsers(startDate+7d, startDate+28d),
    r3 = DistinctUsers(startDate+7d, startDate+35d) )
| union (week | where Cohort == startDate + 14d
| extend
    r0 = DistinctUsers(startDate+14d, startDate+21d),
    r1 = DistinctUsers(startDate+14d, startDate+28d),
    r2 = DistinctUsers(startDate+14d, startDate+35d) )
| union (week | where Cohort == startDate + 21d
| extend
    r0 = DistinctUsers(startDate+21d, startDate+28d),
    r1 = DistinctUsers(startDate+21d, startDate+35d) )
| union (week | where Cohort == startDate + 28d
| extend
    r0 = DistinctUsers (startDate+28d, startDate+35d) )
// Calculate the retention percentage for each cohort by weeks
| project Cohort, r0, r1, r2, r3, r4,
          p0 = r0/r0*100,
          p1 = todouble(r1)/todouble (r0)*100,
          p2 = todouble(r2)/todouble(r0)*100,
          p3 = todouble(r3)/todouble(r0)*100,
          p4 = todouble(r4)/todouble(r0)*100
| sort by Cohort asc

Tady je výstup:

Snímek obrazovky znázorňující tabulku kohort založených na aktivitě

Průběžné měsíční aktivity aktivních uživatelů a uživatelů

Následující příklad používá analýzu časových řad s funkcí series_fir . Funkci můžete použít series_fir pro výpočty posuvných oken. Ukázková aplikace, která se monitoruje, je online obchod, který sleduje aktivitu uživatelů prostřednictvím vlastních událostí. Dotaz sleduje dva typy aktivit uživatelů: AddToCart a Checkout. Definuje aktivního uživatele jako uživatele, který dokončil rezervaci alespoň jednou v konkrétní den.

let endtime = endofday(datetime(2017-03-01T00:00:00Z));
let window = 60d;
let starttime = endtime-window;
let interval = 1d;
let user_bins_to_analyze = 28;
// Create an array of filters coefficients for series_fir(). A list of '1' in our case will produce a simple sum.
let moving_sum_filter = toscalar(range x from 1 to user_bins_to_analyze step 1 | extend v=1 | summarize make_list(v)); 
// Level of engagement. Users will be counted as engaged if they completed at least this number of activities.
let min_activity = 1;
customEvents
| where timestamp > starttime
| where customDimensions["sourceapp"] == "ai-loganalyticsui-prod"
// We want to analyze users who actually checked out in our website.
| where (name == "Checkout") and user_AuthenticatedId <> ""
// Create a series of activities per user.
| make-series UserClicks=count() default=0 on timestamp
	in range(starttime, endtime-1s, interval) by user_AuthenticatedId
// Create a new column that contains a sliding sum.
// Passing 'false' as the last parameter to series_fir() prevents normalization of the calculation by the size of the window.
// For each time bin in the *RollingUserClicks* column, the value is the aggregation of the user activities in the
// 28 days that preceded the bin. For example, if a user was active once on 2016-12-31 and then inactive throughout
// January, then the value will be 1 between 2016-12-31 -> 2017-01-28 and then 0s.
| extend RollingUserClicks=series_fir(UserClicks, moving_sum_filter, false)
// Use the zip() operator to pack the timestamp with the user activities per time bin.
| project User_AuthenticatedId=user_AuthenticatedId , RollingUserClicksByDay=zip(timestamp, RollingUserClicks)
// Transpose the table and create a separate row for each combination of user and time bin (1 day).
| mv-expand RollingUserClicksByDay
| extend Timestamp=todatetime(RollingUserClicksByDay[0])
// Mark the users that qualify according to min_activity.
| extend RollingActiveUsersByDay=iff(toint(RollingUserClicksByDay[1]) >= min_activity, 1, 0)
// And finally, count the number of users per time bin.
| summarize sum(RollingActiveUsersByDay) by Timestamp
// First 28 days contain partial data, so we filter them out.
| where Timestamp > starttime + 28d
// Render as timechart.
| render timechart

Tady je výstup:

Snímek obrazovky s grafem, který zobrazuje postup aktivních uživatelů po dnech v průběhu měsíce

Následující příklad změní předchozí dotaz na opakovaně použitelnou funkci. Příklad pak použije dotaz k výpočtu průběžného chování uživatele. Aktivní uživatel v tomto dotazu je definován jako uživatel, který dokončil rezervaci alespoň jednou v konkrétní den.

let rollingDcount = (sliding_window_size: int, event_name:string)
{
    let endtime = endofday(datetime(2017-03-01T00:00:00Z));
    let window = 90d;
    let starttime = endtime-window;
    let interval = 1d;
    let moving_sum_filter = toscalar(range x from 1 to sliding_window_size step 1 | extend v=1| summarize make_list(v));    
    let min_activity = 1;
    customEvents
    | where timestamp > starttime
    | where customDimensions["sourceapp"]=="ai-loganalyticsui-prod"
    | where (name == event_name)
    | where user_AuthenticatedId <> ""
    | make-series UserClicks=count() default=0 on timestamp
		in range(starttime, endtime-1s, interval) by user_AuthenticatedId
    | extend RollingUserClicks=fir(UserClicks, moving_sum_filter, false)
    | project User_AuthenticatedId=user_AuthenticatedId , RollingUserClicksByDay=zip(timestamp, RollingUserClicks)
    | mv-expand RollingUserClicksByDay
    | extend Timestamp=todatetime(RollingUserClicksByDay[0])
    | extend RollingActiveUsersByDay=iff(toint(RollingUserClicksByDay[1]) >= min_activity, 1, 0)
    | summarize sum(RollingActiveUsersByDay) by Timestamp
    | where Timestamp > starttime + 28d
};
// Use the moving_sum_filter with bin size of 28 to count MAU.
rollingDcount(28, "Checkout")
| join
(
    // Use the moving_sum_filter with bin size of 1 to count DAU.
    rollingDcount(1, "Checkout")
)
on Timestamp
| project sum_RollingActiveUsersByDay1 *1.0 / sum_RollingActiveUsersByDay, Timestamp
| render timechart

Tady je výstup:

Snímek obrazovky s grafem, který zobrazuje uživatele v průběhu času

Regresní analýza

Tento příklad ukazuje, jak vytvořit automatizovaný detektor přerušení služeb založený výhradně na protokolech trasování aplikace. Detektor hledá neobvyklé, náhlé zvýšení relativního množství trasování chyb a upozornění v aplikaci.

K vyhodnocení stavu služby na základě dat protokolů trasování se používají dvě techniky:

  • Pomocí řady make-series můžete převést částečně strukturované textové protokoly trasování na metriku, která představuje poměr mezi kladnými a zápornými spojnicemi trasování.
  • Použijte series_fit_2lines a series_fit_line pro pokročilou detekci skoků pomocí analýzy časových řad s dvouřádkovou lineární regresí.
let startDate = startofday(datetime("2017-02-01"));
let endDate = startofday(datetime("2017-02-07"));
let minRsquare = 0.8;  // Tune the sensitivity of the detection sensor. Values close to 1 indicate very low sensitivity.
// Count all Good (Verbose + Info) and Bad (Error + Fatal + Warning) traces, per day.
traces
| where timestamp > startDate and timestamp < endDate
| summarize
    Verbose = countif(severityLevel == 0),
    Info = countif(severityLevel == 1),
    Warning = countif(severityLevel == 2),
    Error = countif(severityLevel == 3),
    Fatal = countif(severityLevel == 4) by bin(timestamp, 1d)
| extend Bad = (Error + Fatal + Warning), Good = (Verbose + Info)
// Determine the ratio of bad traces, from the total.
| extend Ratio = (todouble(Bad) / todouble(Good + Bad))*10000
| project timestamp , Ratio
// Create a time series.
| make-series RatioSeries=take_any(Ratio) default=0 on timestamp in range(startDate , endDate -1d, 1d) by 'TraceSeverity' 
// Apply a 2-line regression to the time series.
| extend (RSquare2, SplitIdx, Variance2,RVariance2,LineFit2)=series_fit_2lines(RatioSeries)
// Find out if our 2-line is trending up or down.
| extend (Slope,Interception,RSquare,Variance,RVariance,LineFit)=series_fit_line(LineFit2)
// Check whether the line fit reaches the threshold, and if the spike represents an increase (rather than a decrease).
| project PatternMatch = iff(RSquare2 > minRsquare and Slope>0, "Spike detected", "No Match")

Další kroky