Agréger des données dans un pipeline de processeur de données
Important
Opérations Azure IoT (préversion) – activé parc Azure Arc est actuellement en PRÉVERSION. Vous ne devez pas utiliser ce logiciel en préversion dans des environnements de production.
Vous devrez déployer une nouvelle installation d’Azure IoT Operations lorsqu’une version en disponibilité générale est mise à disposition, vous ne pourrez pas mettre à niveau une installation en préversion.
Pour connaître les conditions juridiques qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou plus généralement non encore en disponibilité générale, consultez l’Avenant aux conditions d’utilisation des préversions de Microsoft Azure.
L’index d’agrégat est un index de pipeline facultatif, configurable et intermédiaire qui vous permet d’exécuter des opérations de sous-échantillonnage et de traitement par lot sur les données de capteur de streaming au cours des fenêtres de temps définies par l’utilisateur.
Utilisez un index d’agrégat pour accumuler des messages sur une fenêtre définie et calculer des valeurs d’agrégation à partir de propriétés dans les messages. L’index émet les valeurs agrégées sous forme de propriétés dans un seul message à la fin de chaque fenêtre de temps.
- Chaque partition de pipeline exécute l’agrégation de manière indépendante.
- La sortie de la phase est un message unique qui contient toutes les propriétés d’agrégat définies.
- L’index supprime toutes les autres propriétés. Toutefois, vous pouvez utiliser les fonctions Last, First ou Collect pour conserver les propriétés qui seraient autrement supprimées par la phase pendant l’agrégation.
- Pour que l’index d’agrégat fonctionne, l’index de source de données du pipeline doit désérialiser le message entrant.
Prérequis
Pour configurer et utiliser une phase de pipeline d’agrégation, vous avez besoin d’une instance déployée du processeur de données qui inclut le composant facultatif du processeur de données.
Configurer l’index
La configuration JSON de l’index d’agrégat définit les détails de l’index. Pour créer l’index, vous pouvez interagir avec l’interface utilisateur basée sur le formulaire ou fournir la configuration JSON sous l’onglet Advanced (Avancé) :
Champ | Type | Description | Obligatoire | Par défaut | Exemple |
---|---|---|---|---|---|
Nom | Chaîne | Nom à afficher dans l’interface utilisateur du processeur de données. | Oui | - | Calculate Aggregate |
Description | Chaîne | Description conviviale de l’action réalisée par l’index d’agrégat. | Non | Aggregation over temperature |
|
Fenêtre de temps | Durée qui spécifie la période pendant laquelle l’agrégation s’exécute. | Oui | - | 10s |
|
Propriétés > Fonction | Enum | Fonction d’agrégat à utiliser. | Oui | - | Sum |
Propriétés > InputPath1 | Chemin d’accès | Chemin d’accès à la propriété dans le message entrant à appliquer la fonction. | Oui | - | .payload.temperature |
Propriétés > OutputPath2 | Chemin d’accès | Chemin d’accès à l’emplacement dans le message sortant pour placer le résultat. | Oui | - | .payload.temperature.average |
Vous pouvez définir plusieurs configurations de propriétés dans un index d’agrégat. Par exemple, calculer la somme des températures et calculer la moyenne de la pression.
1Chemin d'entrée :
- Le type de données de la valeur de la propriété de chemin d’entrée doit être compatible avec le type de fonction défini.
- Vous pouvez fournir le même chemin d’entrée dans plusieurs configurations d’agrégation pour calculer plusieurs fonctions sur la même propriété de chemin d’entrée. Vérifiez que les chemins de sortie sont différents pour éviter de remplacer les résultats.
2Chemin de sortie :
- Les chemins de sortie peuvent être identiques ou différents du chemin d’entrée. Utilisez d’autres chemins de sortie si vous calculez plusieurs agrégations sur la même propriété de chemin d’entrée.
- Configurez des chemins de sortie distincts pour éviter de remplacer les valeurs d’agrégat.
Windows
La fenêtre est l’intervalle de temps au cours duquel l’index accumule les messages. À la fin de la fenêtre, l’index applique la fonction configurée aux propriétés du message. L’index émet ensuite un seul message.
Actuellement, l’index prend uniquement en charge les fenêtres bascule.
Les fenêtres bascule sont une série d’intervalles de temps consécutifs fixes, qui ne se chevauchent pas. La fenêtre démarre et se termine à des points fixes dans le temps :
La taille de la fenêtre définit l’intervalle de temps au cours duquel l’index accumule les messages. Vous définissez la taille de la fenêtre à l’aide du modèle commun de durée.
Functions
L’index d’agrégat prend en charge les fonctions suivantes pour calculer les valeurs d’agrégat sur la propriété de message définie dans le chemin d’entrée :
Fonction | Description |
---|---|
Somme | Calcule la somme des valeurs de la propriété dans les messages d’entrée. |
Moyenne | Calcule la moyenne des valeurs de la propriété dans les messages d’entrée. |
Count | Compte le nombre de fois où la propriété apparaît dans la fenêtre. |
Min | Calcule la valeur minimale des valeurs de la propriété dans les messages d’entrée. |
Max | Calcule la valeur maximale des valeurs de la propriété dans les messages d’entrée. |
Dernier | Retourne la dernière valeur des valeurs de la propriété dans les messages d’entrée. |
Premier | Retourne la première valeur des valeurs de la propriété dans les messages d’entrée. |
Collect | Retourne toutes les valeurs de la propriété dans les messages d’entrée. |
Le tableau suivant répertorie les types de données de message pris en charge par chaque fonction :
Fonction | Entier | Float | Chaîne | DateHeure | Tableau | Objet | Binaire |
---|---|---|---|---|---|---|---|
Somme | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ |
Average | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ |
Count | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Min | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ |
Max | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ |
Dernier | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Premier | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Collect | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Exemple de configuration
L’exemple JSON suivant montre une configuration complète d’index d’agrégat :
{
"displayName":"downSample",
"description":"Calculate average for production tags",
"window":
{
"type":"tumbling",
"size":"10s"
},
"properties":
[
{
"function":"average",
"inputPath": ".payload.temperature",
"outputPath":".payload.temperature_avg"
},
{
"function":"collect",
"inputPath": ".payload.temperature",
"outputPath":".payload.temperature_all"
},
{
"function":"average",
"inputPath":".payload.pressure",
"outputPath":".payload.pressure"
},
{
"function":"last",
"inputPath":".systemProperties",
"outputPath": ".systemProperties"
}
]
}
La configuration définit un index d’agrégat qui calcule, sur une fenêtre de dix secondes :
- La température moyenne
- La somme des températures
- La somme des pressions
Exemple
Cet exemple contient deux exemples de messages d’entrée et un exemple de message de sortie générés à l’aide de la configuration précédente :
Message d’entrée 1 :
{
"systemProperties":{
"partitionKey":"foo",
"partitionId":5,
"timestamp":"2023-01-11T10:02:07Z"
},
"qos":1,
"topic":"/assets/foo/tags/bar",
"properties":{
"responseTopic":"outputs/foo/tags/bar",
"contentType": "application/json"
},
"payload":{
"humidity": 10,
"temperature":250,
"pressure":30,
"runningState": true
}
}
Message d’entrée 2 :
{
"systemProperties":{
"partitionKey":"foo",
"partitionId":5,
"timestamp":"2023-01-11T10:02:07Z"
},
"qos":1,
"topic":"/assets/foo/tags/bar",
"properties":{
"responseTopic":"outputs/foo/tags/bar",
"contentType": "application/json"
},
"payload":{
"humidity": 11,
"temperature":235,
"pressure":25,
"runningState": true
}
}
Message de sortie :
{
"systemProperties":{
"partitionKey":"foo",
"partitionId":5,
"timestamp":"2023-01-11T10:02:07Z"
},
"payload":{
"temperature_avg":242.5,
"temperature_all":[250,235],
"pressure":27.5
}
}