Partager via


Agréger des données dans un pipeline de processeur de données

Important

Opérations Azure IoT (préversion) – activé parc Azure Arc est actuellement en PRÉVERSION. Vous ne devez pas utiliser ce logiciel en préversion dans des environnements de production.

Vous devrez déployer une nouvelle installation d’Azure IoT Operations lorsqu’une version en disponibilité générale est mise à disposition, vous ne pourrez pas mettre à niveau une installation en préversion.

Pour connaître les conditions juridiques qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou plus généralement non encore en disponibilité générale, consultez l’Avenant aux conditions d’utilisation des préversions de Microsoft Azure.

L’index d’agrégat est un index de pipeline facultatif, configurable et intermédiaire qui vous permet d’exécuter des opérations de sous-échantillonnage et de traitement par lot sur les données de capteur de streaming au cours des fenêtres de temps définies par l’utilisateur.

Utilisez un index d’agrégat pour accumuler des messages sur une fenêtre définie et calculer des valeurs d’agrégation à partir de propriétés dans les messages. L’index émet les valeurs agrégées sous forme de propriétés dans un seul message à la fin de chaque fenêtre de temps.

  • Chaque partition de pipeline exécute l’agrégation de manière indépendante.
  • La sortie de la phase est un message unique qui contient toutes les propriétés d’agrégat définies.
  • L’index supprime toutes les autres propriétés. Toutefois, vous pouvez utiliser les fonctions Last, First ou Collect pour conserver les propriétés qui seraient autrement supprimées par la phase pendant l’agrégation.
  • Pour que l’index d’agrégat fonctionne, l’index de source de données du pipeline doit désérialiser le message entrant.

Prérequis

Pour configurer et utiliser une phase de pipeline d’agrégation, vous avez besoin d’une instance déployée du processeur de données qui inclut le composant facultatif du processeur de données.

Configurer l’index

La configuration JSON de l’index d’agrégat définit les détails de l’index. Pour créer l’index, vous pouvez interagir avec l’interface utilisateur basée sur le formulaire ou fournir la configuration JSON sous l’onglet Advanced (Avancé) :

Champ Type Description Obligatoire Par défaut Exemple
Nom Chaîne Nom à afficher dans l’interface utilisateur du processeur de données. Oui - Calculate Aggregate
Description Chaîne Description conviviale de l’action réalisée par l’index d’agrégat. Non Aggregation over temperature
Fenêtre de temps Durée qui spécifie la période pendant laquelle l’agrégation s’exécute. Oui - 10s
Propriétés > Fonction Enum Fonction d’agrégat à utiliser. Oui - Sum
Propriétés > InputPath1 Chemin d’accès Chemin d’accès à la propriété dans le message entrant à appliquer la fonction. Oui - .payload.temperature
Propriétés > OutputPath2 Chemin d’accès Chemin d’accès à l’emplacement dans le message sortant pour placer le résultat. Oui - .payload.temperature.average

Vous pouvez définir plusieurs configurations de propriétés dans un index d’agrégat. Par exemple, calculer la somme des températures et calculer la moyenne de la pression.

1Chemin d'entrée :

  • Le type de données de la valeur de la propriété de chemin d’entrée doit être compatible avec le type de fonction défini.
  • Vous pouvez fournir le même chemin d’entrée dans plusieurs configurations d’agrégation pour calculer plusieurs fonctions sur la même propriété de chemin d’entrée. Vérifiez que les chemins de sortie sont différents pour éviter de remplacer les résultats.

2Chemin de sortie :

  • Les chemins de sortie peuvent être identiques ou différents du chemin d’entrée. Utilisez d’autres chemins de sortie si vous calculez plusieurs agrégations sur la même propriété de chemin d’entrée.
  • Configurez des chemins de sortie distincts pour éviter de remplacer les valeurs d’agrégat.

Windows

La fenêtre est l’intervalle de temps au cours duquel l’index accumule les messages. À la fin de la fenêtre, l’index applique la fonction configurée aux propriétés du message. L’index émet ensuite un seul message.

Actuellement, l’index prend uniquement en charge les fenêtres bascule.

Les fenêtres bascule sont une série d’intervalles de temps consécutifs fixes, qui ne se chevauchent pas. La fenêtre démarre et se termine à des points fixes dans le temps :

Diagramme illustrant 10 secondes de fenêtres de basculement en phase d’agrégation.

La taille de la fenêtre définit l’intervalle de temps au cours duquel l’index accumule les messages. Vous définissez la taille de la fenêtre à l’aide du modèle commun de durée.

Functions

L’index d’agrégat prend en charge les fonctions suivantes pour calculer les valeurs d’agrégat sur la propriété de message définie dans le chemin d’entrée :

Fonction Description
Somme Calcule la somme des valeurs de la propriété dans les messages d’entrée.
Moyenne Calcule la moyenne des valeurs de la propriété dans les messages d’entrée.
Count Compte le nombre de fois où la propriété apparaît dans la fenêtre.
Min Calcule la valeur minimale des valeurs de la propriété dans les messages d’entrée.
Max Calcule la valeur maximale des valeurs de la propriété dans les messages d’entrée.
Dernier Retourne la dernière valeur des valeurs de la propriété dans les messages d’entrée.
Premier Retourne la première valeur des valeurs de la propriété dans les messages d’entrée.
Collect Retourne toutes les valeurs de la propriété dans les messages d’entrée.

Le tableau suivant répertorie les types de données de message pris en charge par chaque fonction :

Fonction Entier Float Chaîne DateHeure Tableau Objet Binaire
Somme
Average
Count
Min
Max
Dernier
Premier
Collect

Exemple de configuration

L’exemple JSON suivant montre une configuration complète d’index d’agrégat :

{ 
    "displayName":"downSample", 
    "description":"Calculate average for production tags", 
    "window": 
    { 
        "type":"tumbling", 
        "size":"10s" 
    }, 
    "properties": 
    [ 
        { 
            "function":"average", 
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_avg" 
        }, 
        {  
            "function":"collect",  
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_all"  
        },  
        {  
            "function":"average",  
            "inputPath":".payload.pressure", 
            "outputPath":".payload.pressure"                  
        },  
        {  
            "function":"last",  
            "inputPath":".systemProperties", 
            "outputPath": ".systemProperties" 
        } 
    ] 
}

La configuration définit un index d’agrégat qui calcule, sur une fenêtre de dix secondes :

  • La température moyenne
  • La somme des températures
  • La somme des pressions

Exemple

Cet exemple contient deux exemples de messages d’entrée et un exemple de message de sortie générés à l’aide de la configuration précédente :

Message d’entrée 1 :

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 10, 
        "temperature":250, 
        "pressure":30, 
        "runningState": true 
    } 
} 

Message d’entrée 2 :

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 11, 
        "temperature":235, 
        "pressure":25, 
        "runningState": true 
    } 
} 

Message de sortie :

{ 
    "systemProperties":{  
        "partitionKey":"foo",  
        "partitionId":5,  
        "timestamp":"2023-01-11T10:02:07Z"  
    }, 
    "payload":{ 
        "temperature_avg":242.5, 
        "temperature_all":[250,235], 
        "pressure":27.5 
    } 
}