Condividi tramite


Aggregare i dati in una pipeline del processore di dati

Importante

Anteprima delle operazioni di Azure IoT: abilitata da Azure Arc è attualmente disponibile in ANTEPRIMA. Non è consigliabile usare questo software di anteprima negli ambienti di produzione.

Sarà necessario distribuire una nuova installazione di Azure IoT Operations quando viene resa disponibile una versione disponibile a livello generale, non sarà possibile aggiornare un'installazione di anteprima.

Vedere le condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure per termini legali aggiuntivi che si applicano a funzionalità di Azure in versione beta, in anteprima o in altro modo non ancora disponibili a livello generale.

La fase di aggregazione è una fase facoltativa, configurabile e intermedia della pipeline che consente di eseguire operazioni di campionamento inattivo e batch sui dati dei sensori di streaming su finestre temporali definite dall'utente.

Usare una fase di aggregazione per accumulare messaggi in una finestra definita e calcolare i valori di aggregazione dalle proprietà nei messaggi. La fase genera i valori aggregati come proprietà in un singolo messaggio alla fine di ogni intervallo di tempo.

  • Ogni partizione della pipeline esegue l'aggregazione indipendentemente l'una dall'altra.
  • L'output della fase è un singolo messaggio che contiene tutte le proprietà di aggregazione definite.
  • La fase elimina tutte le altre proprietà. Tuttavia, è possibile usare le funzioni Last, First o Collect per mantenere le proprietà che altrimenti verrebbero eliminate dalla fase durante l'aggregazione.
  • Per il corretto funzionamento della fase di aggregazione, la fase dell'origine dati nella pipeline deve deserializzare il messaggio in arrivo.

Prerequisiti

Per configurare e usare una fase di pipeline di aggregazione, è necessaria un'istanza distribuita del responsabile del trattamento dei dati che include il componente facoltativo del processore di dati.

Configurare la fase

La configurazione JSON della fase di aggregazione definisce i dettagli della fase. Per creare la fase, è possibile interagire con l'interfaccia utente basata su form o specificare la configurazione JSON nella scheda Avanzate :

Campo Tipo Descrizione Richiesto Valore predefinito Esempio
Nome string Nome da visualizzare nell'interfaccia utente del responsabile del trattamento dei dati. - Calculate Aggregate
Descrizione Stringa Descrizione intuitiva delle operazioni della fase di aggregazione. No Aggregation over temperature
Intervallo di tempo Durata che specifica il periodo in cui viene eseguita l'aggregazione. - 10s
Funzione Properties > Enumerazione Funzione di aggregazione da usare. - Sum
> Proprietà InputPath1 Percorso Percorso della proprietà nel messaggio in ingresso a cui applicare la funzione. - .payload.temperature
> Proprietà OutputPath2 Percorso Percorso del percorso nel messaggio in uscita per posizionare il risultato. - .payload.temperature.average

È possibile definire più configurazioni delle proprietà in una fase di aggregazione. Ad esempio, calcolare la somma della temperatura e calcolare la media di pressione.

1Percorso di input:

  • Il tipo di dati del valore della proprietà del percorso di input deve essere compatibile con il tipo di funzione definito.
  • È possibile fornire lo stesso percorso di input tra più configurazioni di aggregazione per calcolare più funzioni sulla stessa proprietà del percorso di input. Assicurarsi che i percorsi di output siano diversi per evitare di sovrascrivere i risultati.

2Percorso di output:

  • I percorsi di output possono essere uguali o diversi dal percorso di input. Usare percorsi di output diversi se si calcolano più aggregazioni nella stessa proprietà del percorso di input.
  • Configurare percorsi di output distinti per evitare la sovrascrittura dei valori aggregati.

Finestre

La finestra è l'intervallo di tempo in cui la fase accumula messaggi. Alla fine della finestra, la fase applica la funzione configurata alle proprietà del messaggio. La fase genera quindi un singolo messaggio.

Attualmente, la fase supporta solo finestre a cascata .

Le finestre a cascata sono una serie di intervalli di tempo fissi, non sovrapposti e consecutivi. La finestra inizia e termina in punti fissi nel tempo:

Diagramma che mostra 10 secondi finestre a cascata nella fase di aggregazione.

La dimensione della finestra definisce l'intervallo di tempo in cui la fase accumula i messaggi. Per definire le dimensioni della finestra, usare il modello comune Durata .

Funzioni

La fase di aggregazione supporta le funzioni seguenti per calcolare i valori aggregati sulla proprietà del messaggio definita nel percorso di input:

Funzione Descrizione
Sum Calcola la somma dei valori della proprietà nei messaggi di input.
Media Calcola la media dei valori della proprietà nei messaggi di input.
Conteggio Conta il numero di volte in cui la proprietà viene visualizzata nella finestra.
Min Calcola il valore minimo dei valori della proprietà nei messaggi di input.
Max Calcola il valore massimo dei valori della proprietà nei messaggi di input.
Last Restituisce il valore più recente dei valori della proprietà nei messaggi di input.
First Restituisce il primo valore dei valori della proprietà nei messaggi di input.
Collect Restituisce tutti i valori della proprietà nei messaggi di input.

Nella tabella seguente sono elencati i tipi di dati dei messaggi supportati da ogni funzione:

Funzione Intero Float Stringa Datetime Array Object Binario
Sum
Media
Conteggio
Min
Max
Last
First
Collect

Configurazione di esempio

L'esempio JSON seguente mostra una configurazione completa della fase di aggregazione:

{ 
    "displayName":"downSample", 
    "description":"Calculate average for production tags", 
    "window": 
    { 
        "type":"tumbling", 
        "size":"10s" 
    }, 
    "properties": 
    [ 
        { 
            "function":"average", 
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_avg" 
        }, 
        {  
            "function":"collect",  
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_all"  
        },  
        {  
            "function":"average",  
            "inputPath":".payload.pressure", 
            "outputPath":".payload.pressure"                  
        },  
        {  
            "function":"last",  
            "inputPath":".systemProperties", 
            "outputPath": ".systemProperties" 
        } 
    ] 
}

La configurazione definisce una fase di aggregazione che calcola, in una finestra di dieci secondi:

  • Temperatura media
  • Somma della temperatura
  • Somma della pressione

Esempio

Questo esempio include due messaggi di input di esempio e un messaggio di output di esempio generato usando la configurazione precedente:

Messaggio di input 1:

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 10, 
        "temperature":250, 
        "pressure":30, 
        "runningState": true 
    } 
} 

Messaggio di input 2:

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 11, 
        "temperature":235, 
        "pressure":25, 
        "runningState": true 
    } 
} 

Messaggio di output:

{ 
    "systemProperties":{  
        "partitionKey":"foo",  
        "partitionId":5,  
        "timestamp":"2023-01-11T10:02:07Z"  
    }, 
    "payload":{ 
        "temperature_avg":242.5, 
        "temperature_all":[250,235], 
        "pressure":27.5 
    } 
}