Share via


Aggregera data i en azure IoT Data Processor Preview-pipeline

Viktigt!

Förhandsversion av Azure IoT Operations – aktiverad av Azure Arc finns för närvarande i FÖRHANDSVERSION. Du bör inte använda den här förhandsgranskningsprogramvaran i produktionsmiljöer.

Juridiska villkor för Azure-funktioner i betaversion, förhandsversion eller som av någon annan anledning inte har gjorts allmänt tillgängliga ännu finns i kompletterande användningsvillkor för Microsoft Azure-förhandsversioner.

Aggregeringssteget är ett valfritt, konfigurerbart mellanliggande pipelinesteg som gör att du kan köra nedsamplings- och batchåtgärder på strömmande sensordata över användardefinierade tidsfönster.

Använd en aggregeringsfas för att ackumulera meddelanden över ett definierat fönster och beräkna aggregeringsvärden från egenskaper i meddelandena. Fasen genererar de aggregerade värdena som egenskaper i ett enda meddelande i slutet av varje tidsfönster.

  • Varje pipelinepartition utför aggregering oberoende av varandra.
  • Utdata från fasen är ett enda meddelande som innehåller alla definierade aggregeringsegenskaper.
  • Fasen släpper alla andra egenskaper. Du kan dock använda funktionerna Last, First eller Collectför att bevara egenskaper som annars skulle tas bort av fasen under aggregering.
  • För att aggregeringssteget ska fungera bör datakällans fas i pipelinen deserialisera det inkommande meddelandet.

Förutsättningar

För att konfigurera och använda en sammanställd pipelinefas behöver du en distribuerad instans av Azure IoT Data Processor Preview.

Konfigurera fasen

JSON-konfigurationen för sammansättningssteget definierar detaljerna i fasen. Om du vill skapa fasen kan du antingen interagera med det formulärbaserade användargränssnittet eller ange JSON-konfigurationen på fliken Avancerat :

Fält Type Beskrivning Obligatoriskt Standardvärde Exempel
Name String Ett namn som ska visas i användargränssnittet för dataprocessorn. Ja - Calculate Aggregate
beskrivning String En användarvänlig beskrivning av vad aggregeringssteget gör. Nej Aggregation over temperature
Tidsfönster Varaktighet som anger den period under vilken aggregeringen körs. Ja - 10s
Egenskapsfunktion > Enum Den mängdfunktion som ska användas. Ja - Sum
Egenskaper > InputPath1 Sökväg Sökvägen till egenskapen i det inkommande meddelandet som funktionen ska tillämpas på. Ja - .payload.temperature
Egenskaper > OutputPath2 Sökväg Sökvägen till platsen i det utgående meddelandet för att placera resultatet. Ja - .payload.temperature.average

Du kan definiera flera egenskapskonfigurationer i en aggregeringsfas. Du kan till exempel beräkna summan av temperaturen och beräkna medelvärdet av trycket.

1Indatasökväg:

  • Datatypen för värdet för indatasökvägsegenskapen måste vara kompatibel med den typ av funktion som definierats.
  • Du kan ange samma indatasökväg i flera sammansättningskonfigurationer för att beräkna flera funktioner över samma indatasökvägsegenskap. Kontrollera att utdatasökvägarna skiljer sig åt för att undvika att skriva över resultaten.

2Utdatasökväg:

  • Utdatasökvägar kan vara samma som eller skilja sig från indatasökvägen. Använd olika utdatasökvägar om du beräknar flera aggregeringar på samma indatasökvägsegenskap.
  • Konfigurera distinkta utdatasökvägar för att undvika att skriva över aggregeringsvärden.

Windows

Fönstret är det tidsintervall under vilket fasen ackumulerar meddelanden. I slutet av fönstret tillämpar fasen den konfigurerade funktionen på meddelandeegenskaperna. Fasen genererar sedan ett enda meddelande.

För närvarande stöder fasen endast rullande fönster.

Rullande fönster är en serie med fast storlek, icke-överlappande och efterföljande tidsintervall. Fönstret startar och slutar vid fasta tidpunkter:

Diagram that shows 10 second tumbling windows in the aggregate stage.

Fönstrets storlek definierar det tidsintervall som fasen ackumulerar meddelandena över. Du definierar fönsterstorleken med hjälp av det vanliga mönstret Varaktighet .

Functions

Aggregeringssteget stöder följande funktioner för att beräkna aggregerade värden över meddelandeegenskapen som definierats i indatasökvägen:

Function beskrivning
Sum Beräknar summan av värdena för egenskapen i indatameddelandena.
Genomsnitt Beräknar medelvärdet av värdena för egenskapen i indatameddelandena.
Antal Räknar antalet gånger egenskapen visas i fönstret.
Min Beräknar det minsta värdet för värdena för egenskapen i indatameddelandena.
Max Beräknar det maximala värdet för värdena för egenskapen i indatameddelandena.
Last Returnerar det senaste värdet för värdena för egenskapen i indatameddelandena.
First Returnerar det första värdet för värdena för egenskapen i indatameddelandena.
Collect Returnera alla värden för egenskapen i indatameddelandena.

I följande tabell visas de typer av meddelandedata som stöds av varje funktion:

Funktion Integer Flyttal String Datetime Matris Objekt Binära
Sum
Medel
Antal
Min
Max
Last
First
Collect

Exempelkonfiguration

I följande JSON-exempel visas en fullständig konfiguration av aggregerade steg:

{ 
    "displayName":"downSample", 
    "description":"Calculate average for production tags", 
    "window": 
    { 
        "type":"tumbling", 
        "size":"10s" 
    }, 
    "properties": 
    [ 
        { 
            "function":"average", 
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_avg" 
        }, 
        {  
            "function":"collect",  
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_all"  
        },  
        {  
            "function":"average",  
            "inputPath":".payload.pressure", 
            "outputPath":".payload.pressure"                  
        },  
        {  
            "function":"last",  
            "inputPath":".systemProperties", 
            "outputPath": ".systemProperties" 
        } 
    ] 
}

Konfigurationen definierar ett aggregeringssteg som beräknar under ett tio sekunders fönster:

  • Medeltemperatur
  • Summa av temperatur
  • Trycksumma

Exempel

Det här exemplet innehåller två exempel på indatameddelanden och ett exempel på utdatameddelande som genererades med hjälp av den tidigare konfigurationen:

Indatameddelande 1:

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 10, 
        "temperature":250, 
        "pressure":30, 
        "runningState": true 
    } 
} 

Indatameddelande 2:

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 11, 
        "temperature":235, 
        "pressure":25, 
        "runningState": true 
    } 
} 

Utdatameddelande:

{ 
    "systemProperties":{  
        "partitionKey":"foo",  
        "partitionId":5,  
        "timestamp":"2023-01-11T10:02:07Z"  
    }, 
    "payload":{ 
        "temperature_avg":242.5, 
        "temperature_all":[250,235], 
        "pressure":27.5 
    } 
}