Partager via


Configurer une étape source MQ dans un pipeline de Processeur de données Azure IoT (préversion)

Important

Opérations Azure IoT (préversion) – activé parc Azure Arc est actuellement en PRÉVERSION. Vous ne devez pas utiliser ce logiciel en préversion dans des environnements de production.

Pour connaître les conditions juridiques qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou plus généralement non encore en disponibilité générale, consultez l’Avenant aux conditions d’utilisation des préversions de Microsoft Azure.

L’étape source est la première étape requise dans un pipeline Processeur de données Azure IoT (préversion). L’étape source obtient des données dans le pipeline de traitement des données et la prépare pour un traitement ultérieur. L’étape source MQ vous permet de vous abonner aux messages d’une rubrique MQTT. Dans l’étape source, vous définissez les détails de connexion à la source MQ et établissez une configuration de partitionnement en fonction de vos besoins spécifiques en matière de traitement des données.

Prérequis

  • Une instance déployée du processeur de données Azure IoT en préversion qui inclut le composant Data Processor facultatif.
  • Une instance du répartiteur Azure IoT MQ Preview avec toutes les données brutes nécessaires disponibles est opérationnelle et accessible.

Configurer la source MQ

Pour configurer la source MQ :

  • Fournissez les détails de connexion à la source MQ. Cette configuration inclut le type de la source MQ, l’URL du répartiteur MQTT, le niveau qualité de service (QoS), le type de session et les rubriques auxquelles s’abonner.
  • Spécifiez la méthode d'authentification. Actuellement limité à l’authentification par nom d’utilisateur/mot de passe ou par jeton de compte de service.

Le tableau suivant décrit les paramètres de configuration source MQ :

Champ Description Obligatoire Par défaut Exemple
Nom Nom visible par le client pour l’étape source. Requis NA asset-1broker
Description Description visible par le client de l’étape source. Facultatif NA brokerforasset-1
Service Broker URL du répartiteur MQTT à laquelle se connecter. Requis NA tls://aio-mq-dmqtt-frontend:8883
Authentification Méthode d’authentification pour se connecter au répartiteur. Un des : None, Username/Passwordet Service Account Token (SAT). Requis Service Account Token (SAT) Service Account Token (SAT)
Nom d’utilisateur/mot de passe > Nom d’utilisateur Nom d’utilisateur de l’authentification par nom d’utilisateur/mot de passe Oui N/D myuser
Nom d’utilisateur/mot de passe > secret Référence au mot de passe stocké dans Azure Key Vault. Oui N/D AKV_USERNAME_PASSWORD
QoS Niveau QoS pour la remise des messages. Requis 1 0
Nettoyer la session Défini sur FALSE pour une session persistante. Requis FALSE FALSE
Rubrique Rubrique à laquelle s’abonner pour l’acquisition de données. Requis NA contoso/site1/asset1, contoso/site1/asset2

Pour en savoir plus sur les secrets, consultez Gérer les secrets pour votre déploiement Opérations Azure IoT (préversion).

Le processeur de données ne réorganise pas les données désordonnées provenant du courtier MQTT. Si les données sont reçues dans le désordre par le courtier, elles le restent dans le pipeline.

Sélectionner le format de données

Dans un pipeline de processeur de données, le champ format dans l’étape source spécifie comment désérialiser les données entrantes. Par défaut, le pipeline processeur de données utilise le format raw qui signifie qu’il ne convertit pas les données entrantes. Pour utiliser de nombreuses fonctionnalités de processeur de données telles que les étapes Filter ou Enrich dans un pipeline, vous devez désérialiser vos données à l’étape d’entrée. Vous pouvez choisir de désérialiser vos données entrantes à partir des formats JSON, jsonStream, MessagePack, CBOR, CSV ou Protobuf dans un message lisible par le processeur de données afin d’utiliser la fonctionnalité complète du processeur de données.

Les tableaux suivants décrivent les différentes options de configuration de désérialisation :

Champ Description Obligatoire Default Valeur
Format de données Type du format de données. Oui Raw Raw JSON jsonStream MessagePack CBOR CSV Protobuf

Le champ Data Format est obligatoire et sa valeur détermine les autres champs requis.

Pour désérialiser les messages CSV, vous devez également spécifier les champs suivants :

Champ Description Obligatoire Valeur Exemple
En-tête Indique si les données CSV incluent une ligne d’en-tête. Oui Yes No No
Nom Nom de la colonne au format CSV Oui - temp, asset
Chemin d’accès Le chemin d’accès jq dans le message où les informations de colonne sont ajoutées. Non - Le chemin jq par défaut est le nom de colonne
Type de données Type de données des données de la colonne et façon dont elles sont représentées à l’intérieur du pipeline du processeur de données. Non String, Float, Integer, Boolean, Bytes Valeur par défaut : String

Pour désérialiser les messages Protobuf, vous devez également spécifier les champs suivants :

Champ Description Obligatoire Valeur Exemple
Descripteur Descripteur codé en base64 pour la définition protobuf. Oui - Zhf...
Message Nom du type de message utilisé pour mettre en forme les données. Oui - pipeline
Package Nom du package dans le descripteur où le type est défini. Oui - schedulerv1

Remarque

Le processeur de données ne prend en charge qu’un seul type de message dans chaque fichier .proto.

Configurer le partitionnement

Le partitionnement dans un pipeline divise les données entrantes en partitions distinctes. Le partitionnement permet le parallélisme des données dans le pipeline, ce qui peut améliorer le débit et réduire la latence. Les stratégies de partitionnement affectent la façon dont les données sont traitées dans les autres étapes du pipeline. Par exemple, la dernière étape de valeur connue et l’étape d’agrégation fonctionnent sur chaque partition logique.

Pour partitionner vos données, spécifiez une stratégie de partitionnement et le nombre de partitions à utiliser :

Champ Description Obligatoire Par défaut Exemple
Type de partition Type de partitionnement à utiliser : partition ID ou partition Key Requis Key Key
Expression de partition L’expression jq à utiliser sur le message entrant pour calculer la partition ID ou la partition Key Requis .topic .topic
Nombre de partitions Nombre de partitions dans un pipeline de processeur de données. Requis 2 2

Le processeur de données ajoute des métadonnées supplémentaires au message entrant. Pour comprendre comment spécifier correctement l’expression de partitionnement qui s’exécute sur le message entrant, consultez vue d’ensemble de la structure des messages du processeur de données. Par défaut, l’expression de partitionnement est définie sur 0 avec le Type de partition sur ID pour envoyer toutes les données entrantes à une seule partition.

Pour obtenir des suggestions et en savoir plus, consultez Qu’est-ce que le partitionnement ?.

Exemple de configuration

L’illustration suivante présente un exemple de configuration pour l’étape :

Paramètre Valeur
Nom input data
Service Broker tls://aio-mq-dmqtt-frontend:8883
Authentification Service Account Token (SAT)
Rubrique azure-iot-operations/data/opc-ua-connector-0/#
Format de données JSON

Cette configuration génère ensuite des messages qui ressemblent à l’exemple suivant :

{
    "Timestamp": "2023-08-10T00:54:58.6572007Z", 
    "MessageType": "ua-deltaframe",
    "payload": {
      "temperature": {
        "SourceTimestamp": "2023-08-10T00:54:58.2543129Z",
        "Value": 7109
      },
      "Tag 10": {
        "SourceTimestamp": "2023-08-10T00:54:58.2543482Z",
        "Value": 7109
      }
    },
    "DataSetWriterName": "oven",
    "SequenceNumber": 4660
}