Configurer une étape source MQ dans un pipeline de processeur de données
Important
Opérations Azure IoT (préversion) – activé parc Azure Arc est actuellement en PRÉVERSION. Vous ne devez pas utiliser ce logiciel en préversion dans des environnements de production.
Vous devrez déployer une nouvelle installation d’Azure IoT Operations lorsqu’une version en disponibilité générale est mise à disposition, vous ne pourrez pas mettre à niveau une installation en préversion.
Pour connaître les conditions juridiques qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou plus généralement non encore en disponibilité générale, consultez l’Avenant aux conditions d’utilisation des préversions de Microsoft Azure.
L’étape source est la première étape requise dans un pipeline de processeur de données. L’étape source obtient des données dans le pipeline de traitement des données et la prépare pour un traitement ultérieur. L’étape source MQ vous permet de vous abonner aux messages d’une rubrique MQTT. Dans l’étape source, vous définissez les détails de connexion à la source MQ et établissez une configuration de partitionnement en fonction de vos besoins spécifiques en matière de traitement des données.
Prérequis
- Instance déployée du processeur de données qui inclut le composant facultatif du processeur de données.
- Une instance du répartiteur MQTT avec toutes les données brutes nécessaires disponibles est opérationnelle et accessible.
Configurer la source MQ
Pour configurer la source MQ :
- Fournissez les détails de connexion à la source MQ. Cette configuration inclut le type de la source MQ, l’URL du répartiteur MQTT, le niveau qualité de service (QoS), le type de session et les rubriques auxquelles s’abonner.
- Spécifiez la méthode d'authentification. Actuellement limité à l’authentification par nom d’utilisateur/mot de passe ou par jeton de compte de service.
Le tableau suivant décrit les paramètres de configuration source MQ :
Champ | Description | Obligatoire | Par défaut | Exemple |
---|---|---|---|---|
Nom | Nom visible par le client pour l’étape source. | Requis | NA | asset-1broker |
Description | Description visible par le client de l’étape source. | Facultatif | NA | brokerforasset-1 |
Service Broker | URL du répartiteur MQTT à laquelle se connecter. | Requis | NA | tls://aio-mq-dmqtt-frontend:8883 |
Authentification | Méthode d’authentification pour se connecter au répartiteur. Un des : None , Username/Password et Service Account Token (SAT) . |
Requis | Service Account Token (SAT) |
Service Account Token (SAT) |
Nom d’utilisateur/mot de passe > Nom d’utilisateur | Nom d’utilisateur de l’authentification par nom d’utilisateur/mot de passe | Oui | N/D | myuser |
Nom d’utilisateur/mot de passe > secret | Référence au mot de passe stocké dans Azure Key Vault. | Oui | N/D | AKV_USERNAME_PASSWORD |
QoS | Niveau QoS pour la remise des messages. | Requis | 1 | 0 |
Nettoyer la session | Défini sur FALSE pour une session persistante. |
Requis | FALSE |
FALSE |
Rubrique | Rubrique à laquelle s’abonner pour l’acquisition de données. | Requis | NA | contoso/site1/asset1 , contoso/site1/asset2 |
Pour en savoir plus sur les secrets, consultez Gérer les secrets pour votre déploiement Opérations Azure IoT (préversion).
Le processeur de données ne réorganise pas les données hors commande provenant du répartiteur MQTT. Si les données sont reçues dans le désordre par le courtier, elles le restent dans le pipeline.
Sélectionner le format de données
Dans un pipeline de processeur de données, le champ de format de l’étape source spécifie comment désérialiser les données entrantes. Par défaut, le pipeline du processeur de données utilise le raw
format qui signifie qu’il ne convertit pas les données entrantes. Pour utiliser de nombreuses fonctionnalités de processeur de données telles que Filter
des phases dans Enrich
un pipeline, vous devez désérialiser vos données à l’étape d’entrée. Vous pouvez choisir de désérialiser vos données entrantes à partir de JSON
, , MessagePack
jsonStream
, , CBOR
ou CSV
Protobuf
de mettre en forme un message lisible par le processeur de données afin d’utiliser la fonctionnalité complète du processeur de données.
Les tableaux suivants décrivent les différentes options de configuration de désérialisation :
Champ | Description | Obligatoire | Default | Valeur |
---|---|---|---|---|
Format de données | Type du format de données. | Oui | Raw |
Raw JSON jsonStream MessagePack CBOR CSV Protobuf |
Le champ Data Format
est obligatoire et sa valeur détermine les autres champs requis.
Pour désérialiser les messages CSV, vous devez également spécifier les champs suivants :
Champ | Description | Obligatoire | Valeur | Exemple |
---|---|---|---|---|
En-tête | Indique si les données CSV incluent une ligne d’en-tête. | Oui | Yes No |
No |
Nom | Nom de la colonne au format CSV | Oui | - | temp , asset |
Chemin d’accès | Le chemin d’accès jq dans le message où les informations de colonne sont ajoutées. | Non | - | Le chemin jq par défaut est le nom de colonne |
Type de données | Type de données des données de la colonne et façon dont elles sont représentées à l’intérieur du pipeline du processeur de données. | Non | String , , Float Integer , , Boolean Bytes |
Valeur par défaut : String |
Pour désérialiser les messages Protobuf, vous devez également spécifier les champs suivants :
Champ | Description | Obligatoire | Valeur | Exemple |
---|---|---|---|---|
Descripteur | Descripteur codé en base64 pour la définition protobuf. | Oui | - | Zhf... |
Message | Nom du type de message utilisé pour mettre en forme les données. | Oui | - | pipeline |
Package | Nom du package dans le descripteur où le type est défini. | Oui | - | schedulerv1 |
Remarque
Le processeur de données ne prend en charge qu’un seul type de message dans chaque fichier .proto .
Configurer le partitionnement
Le partitionnement dans un pipeline divise les données entrantes en partitions distinctes. Le partitionnement permet le parallélisme des données dans le pipeline, ce qui peut améliorer le débit et réduire la latence. Les stratégies de partitionnement affectent la façon dont les données sont traitées dans les autres étapes du pipeline. Par exemple, la dernière étape de valeur connue et l’étape d’agrégation fonctionnent sur chaque partition logique.
Pour partitionner vos données, spécifiez une stratégie de partitionnement et le nombre de partitions à utiliser :
Champ | Description | Obligatoire | Par défaut | Exemple |
---|---|---|---|---|
Type de partition | Type de partitionnement à utiliser : partition ID ou partition Key |
Requis | Key |
Key |
Expression de partition | L’expression jq à utiliser sur le message entrant pour calculer la partition ID ou la partition Key |
Requis | .topic |
.topic |
Nombre de partitions | Nombre de partitions dans un pipeline de processeur de données. | Requis | 2 |
2 |
Le processeur de données ajoute des métadonnées supplémentaires au message entrant. Consultez la vue d’ensemble de la structure des messages du processeur de données pour comprendre comment spécifier correctement l’expression de partitionnement qui s’exécute sur le message entrant. Par défaut, l’expression de partitionnement est définie sur 0
avec le Type de partition sur ID
pour envoyer toutes les données entrantes à une seule partition.
Pour obtenir des suggestions et en savoir plus, consultez Qu’est-ce que le partitionnement ?.
Exemple de configuration
L’illustration suivante présente un exemple de configuration pour l’étape :
Paramètre | Valeur |
---|---|
Nom | input data |
Service Broker | tls://aio-mq-dmqtt-frontend:8883 |
Authentification | Service Account Token (SAT) |
Rubrique | azure-iot-operations/data/opc-ua-connector-0/# |
Format de données | JSON |
Cette configuration génère ensuite des messages qui ressemblent à l’exemple suivant :
{
"Timestamp": "2023-08-10T00:54:58.6572007Z",
"MessageType": "ua-deltaframe",
"payload": {
"temperature": {
"SourceTimestamp": "2023-08-10T00:54:58.2543129Z",
"Value": 7109
},
"Tag 10": {
"SourceTimestamp": "2023-08-10T00:54:58.2543482Z",
"Value": 7109
}
},
"DataSetWriterName": "oven",
"SequenceNumber": 4660
}