Configurer une étape source MQ dans un pipeline de Processeur de données Azure IoT (préversion)
Important
Opérations Azure IoT (préversion) – activé parc Azure Arc est actuellement en PRÉVERSION. Vous ne devez pas utiliser ce logiciel en préversion dans des environnements de production.
Pour connaître les conditions juridiques qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou plus généralement non encore en disponibilité générale, consultez l’Avenant aux conditions d’utilisation des préversions de Microsoft Azure.
L’étape source est la première étape requise dans un pipeline Processeur de données Azure IoT (préversion). L’étape source obtient des données dans le pipeline de traitement des données et la prépare pour un traitement ultérieur. L’étape source MQ vous permet de vous abonner aux messages d’une rubrique MQTT. Dans l’étape source, vous définissez les détails de connexion à la source MQ et établissez une configuration de partitionnement en fonction de vos besoins spécifiques en matière de traitement des données.
Prérequis
- Une instance déployée du processeur de données Azure IoT en préversion qui inclut le composant Data Processor facultatif.
- Une instance du répartiteur Azure IoT MQ Preview avec toutes les données brutes nécessaires disponibles est opérationnelle et accessible.
Configurer la source MQ
Pour configurer la source MQ :
- Fournissez les détails de connexion à la source MQ. Cette configuration inclut le type de la source MQ, l’URL du répartiteur MQTT, le niveau qualité de service (QoS), le type de session et les rubriques auxquelles s’abonner.
- Spécifiez la méthode d'authentification. Actuellement limité à l’authentification par nom d’utilisateur/mot de passe ou par jeton de compte de service.
Le tableau suivant décrit les paramètres de configuration source MQ :
Champ | Description | Obligatoire | Par défaut | Exemple |
---|---|---|---|---|
Nom | Nom visible par le client pour l’étape source. | Requis | NA | asset-1broker |
Description | Description visible par le client de l’étape source. | Facultatif | NA | brokerforasset-1 |
Service Broker | URL du répartiteur MQTT à laquelle se connecter. | Requis | NA | tls://aio-mq-dmqtt-frontend:8883 |
Authentification | Méthode d’authentification pour se connecter au répartiteur. Un des : None , Username/Password et Service Account Token (SAT) . |
Requis | Service Account Token (SAT) |
Service Account Token (SAT) |
Nom d’utilisateur/mot de passe > Nom d’utilisateur | Nom d’utilisateur de l’authentification par nom d’utilisateur/mot de passe | Oui | N/D | myuser |
Nom d’utilisateur/mot de passe > secret | Référence au mot de passe stocké dans Azure Key Vault. | Oui | N/D | AKV_USERNAME_PASSWORD |
QoS | Niveau QoS pour la remise des messages. | Requis | 1 | 0 |
Nettoyer la session | Défini sur FALSE pour une session persistante. |
Requis | FALSE |
FALSE |
Rubrique | Rubrique à laquelle s’abonner pour l’acquisition de données. | Requis | NA | contoso/site1/asset1 , contoso/site1/asset2 |
Pour en savoir plus sur les secrets, consultez Gérer les secrets pour votre déploiement Opérations Azure IoT (préversion).
Le processeur de données ne réorganise pas les données désordonnées provenant du courtier MQTT. Si les données sont reçues dans le désordre par le courtier, elles le restent dans le pipeline.
Sélectionner le format de données
Dans un pipeline de processeur de données, le champ format dans l’étape source spécifie comment désérialiser les données entrantes. Par défaut, le pipeline processeur de données utilise le format raw
qui signifie qu’il ne convertit pas les données entrantes. Pour utiliser de nombreuses fonctionnalités de processeur de données telles que les étapes Filter
ou Enrich
dans un pipeline, vous devez désérialiser vos données à l’étape d’entrée. Vous pouvez choisir de désérialiser vos données entrantes à partir des formats JSON
, jsonStream
, MessagePack
, CBOR
, CSV
ou Protobuf
dans un message lisible par le processeur de données afin d’utiliser la fonctionnalité complète du processeur de données.
Les tableaux suivants décrivent les différentes options de configuration de désérialisation :
Champ | Description | Obligatoire | Default | Valeur |
---|---|---|---|---|
Format de données | Type du format de données. | Oui | Raw |
Raw JSON jsonStream MessagePack CBOR CSV Protobuf |
Le champ Data Format
est obligatoire et sa valeur détermine les autres champs requis.
Pour désérialiser les messages CSV, vous devez également spécifier les champs suivants :
Champ | Description | Obligatoire | Valeur | Exemple |
---|---|---|---|---|
En-tête | Indique si les données CSV incluent une ligne d’en-tête. | Oui | Yes No |
No |
Nom | Nom de la colonne au format CSV | Oui | - | temp , asset |
Chemin d’accès | Le chemin d’accès jq dans le message où les informations de colonne sont ajoutées. | Non | - | Le chemin jq par défaut est le nom de colonne |
Type de données | Type de données des données de la colonne et façon dont elles sont représentées à l’intérieur du pipeline du processeur de données. | Non | String , Float , Integer , Boolean , Bytes |
Valeur par défaut : String |
Pour désérialiser les messages Protobuf, vous devez également spécifier les champs suivants :
Champ | Description | Obligatoire | Valeur | Exemple |
---|---|---|---|---|
Descripteur | Descripteur codé en base64 pour la définition protobuf. | Oui | - | Zhf... |
Message | Nom du type de message utilisé pour mettre en forme les données. | Oui | - | pipeline |
Package | Nom du package dans le descripteur où le type est défini. | Oui | - | schedulerv1 |
Remarque
Le processeur de données ne prend en charge qu’un seul type de message dans chaque fichier .proto.
Configurer le partitionnement
Le partitionnement dans un pipeline divise les données entrantes en partitions distinctes. Le partitionnement permet le parallélisme des données dans le pipeline, ce qui peut améliorer le débit et réduire la latence. Les stratégies de partitionnement affectent la façon dont les données sont traitées dans les autres étapes du pipeline. Par exemple, la dernière étape de valeur connue et l’étape d’agrégation fonctionnent sur chaque partition logique.
Pour partitionner vos données, spécifiez une stratégie de partitionnement et le nombre de partitions à utiliser :
Champ | Description | Obligatoire | Par défaut | Exemple |
---|---|---|---|---|
Type de partition | Type de partitionnement à utiliser : partition ID ou partition Key |
Requis | Key |
Key |
Expression de partition | L’expression jq à utiliser sur le message entrant pour calculer la partition ID ou la partition Key |
Requis | .topic |
.topic |
Nombre de partitions | Nombre de partitions dans un pipeline de processeur de données. | Requis | 2 |
2 |
Le processeur de données ajoute des métadonnées supplémentaires au message entrant. Pour comprendre comment spécifier correctement l’expression de partitionnement qui s’exécute sur le message entrant, consultez vue d’ensemble de la structure des messages du processeur de données. Par défaut, l’expression de partitionnement est définie sur 0
avec le Type de partition sur ID
pour envoyer toutes les données entrantes à une seule partition.
Pour obtenir des suggestions et en savoir plus, consultez Qu’est-ce que le partitionnement ?.
Exemple de configuration
L’illustration suivante présente un exemple de configuration pour l’étape :
Paramètre | Valeur |
---|---|
Nom | input data |
Service Broker | tls://aio-mq-dmqtt-frontend:8883 |
Authentification | Service Account Token (SAT) |
Rubrique | azure-iot-operations/data/opc-ua-connector-0/# |
Format de données | JSON |
Cette configuration génère ensuite des messages qui ressemblent à l’exemple suivant :
{
"Timestamp": "2023-08-10T00:54:58.6572007Z",
"MessageType": "ua-deltaframe",
"payload": {
"temperature": {
"SourceTimestamp": "2023-08-10T00:54:58.2543129Z",
"Value": 7109
},
"Tag 10": {
"SourceTimestamp": "2023-08-10T00:54:58.2543482Z",
"Value": 7109
}
},
"DataSetWriterName": "oven",
"SequenceNumber": 4660
}
Contenu connexe
Commentaires
https://aka.ms/ContentUserFeedback.
Prochainement : Tout au long de l'année 2024, nous supprimerons progressivement les GitHub Issues en tant que mécanisme de retour d'information pour le contenu et nous les remplacerons par un nouveau système de retour d'information. Pour plus d’informations, voir:Soumettre et afficher des commentaires pour