Partage via


Envoyer des données depuis Azure IoT MQ (préversion) vers Data Lake Storage

Important

Opérations Azure IoT (préversion) – activé parc Azure Arc est actuellement en PRÉVERSION. Vous ne devez pas utiliser ce logiciel en préversion dans des environnements de production.

Pour connaître les conditions juridiques qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou plus généralement non encore en disponibilité générale, consultez l’Avenant aux conditions d’utilisation des préversions de Microsoft Azure.

Vous pouvez utiliser le connecteur de lac de données pour envoyer des données du répartiteur Azure IoT MQ (préversion) à un lac de données, comme Azure Data Lake Storage Gen2 (ADLSv2), Microsoft Fabric OneLake et Azure Data Explorer. Le connecteur s’abonne aux rubriques MQTT et ingère les messages dans des tables Delta dans le compte Data Lake Storage.

Prérequis

Effectuer une configuration pour envoyer des données à Microsoft Fabric OneLake à l’aide de l’identité managée

Configurez un connecteur du lac de données pour vous connecter à Microsoft Fabric OneLake à l’aide d’une identité managée.

  1. Vérifiez que les étapes des prérequis sont remplies, notamment un espace de travail Microsoft Fabric et un lakehouse. La valeur par défaut mon espace de travail ne peut pas être utilisée.

  2. Vérifiez que l’extension IoT MQ Arc est installée et configurée avec l’identité managée.

  3. Dans le Portail Azure, accédez au cluster Kubernetes connecté à Arc et sélectionnez Paramètres>Extensions. Dans la liste des extensions, recherchez le nom de votre extension IoT MQ. Le nom commence par mq-, suivi de cinq caractères aléatoires. Par exemple, mq-4jgjs.

  4. Obtenez l’ID d’application associé à l’identité managée de l’extension IoT MQ Arc, puis notez la valeur GUID. L’ID d’application est différent de l’ID d’objet ou de principal. Vous pouvez utiliser Azure CLI en recherchant l’ID d’objet de l’identité managée, puis en interrogeant l’ID d’application du principal de service associé à l’identité managée. Par exemple :

    OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv)
    az ad sp show --query appId --id $OBJECT_ID --output tsv
    

    Vous devez obtenir une sortie avec une valeur GUID :

    xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    

    Ce GUID est l’ID d’application que vous devez utiliser à l’étape suivante.

  5. Dans l’espace de travail Microsoft Fabric, utilisez Gérer l’accès, puis sélectionnez + Ajouter des personnes ou des groupes.

  6. Recherchez l’extension IoT MQ Arc par son nom « mq », puis veillez à sélectionner la valeur GUID de l’ID d’application que vous avez trouvée à l’étape précédente.

  7. Sélectionnez Contributeur en tant que rôle, puis sélectionnez Ajouter.

  8. Créez une ressource DataLakeConnector qui définit les paramètres de configuration et de point de terminaison du connecteur. Vous pouvez utiliser le YAML fourni comme exemple, mais veillez à modifier les champs suivants :

    • target.fabricOneLake.endpoint : le point de terminaison du compte Microsoft Fabric OneLake. Vous pouvez obtenir l’URL du point de terminaison à partir du lakehouse Microsoft Fabric sous Fichiers>Propriétés. L'URL doit ressembler à https://onelake.dfs.fabric.microsoft.com.
    • target.fabricOneLake.names: noms de l’espace de travail et de la lakehouse. Utilisez ce champ ou guids. Ne pas les utiliser simultanément.
      • workspaceName : nom de l’espace de travail.
      • lakehouseName: nom du lac.
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: info
      databaseFormat: delta
      target:
        fabricOneLake:
          # Example: https://onelake.dfs.fabric.microsoft.com
          endpoint: <example-endpoint-url>
          names:
            workspaceName: <example-workspace-name>
            lakehouseName: <example-lakehouse-name>
          ## OR
          # guids:
          #   workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          #   lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          fabricPath: tables
          authentication:
            systemAssignedManagedIdentity:
              audience: https://storage.azure.com/
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  9. Créez une ressource DataLakeConnectorTopicMap qui définit le mappage entre la rubrique MQTT et la table Delta dans Data Lake Storage. Vous pouvez utiliser le YAML fourni comme exemple, mais veillez à modifier les champs suivants :

    • dataLakeConnectorRef: nom de la ressource DataLakeConnector que vous avez créée précédemment.
    • clientId: identificateur unique pour votre client MQTT.
    • mqttSourceTopic: nom de la rubrique MQTT à partir de laquelle vous souhaitez que les données proviennent.
    • table.tableName: nom de la table à ajouter à la lakehouse. La table est créée automatiquement si elle n’existe pas.
    • table.schema: schéma de la table Delta qui doit correspondre au format et aux champs des messages JSON que vous envoyez à la rubrique MQTT.
  10. Appliquez les ressources DataLakeConnector et DataLakeConnectorTopicMap à votre cluster Kubernetes à l’aide de kubectl apply -f datalake-connector.yaml.

  11. Commencez à envoyer des messages JSON à la rubrique MQTT à l’aide de votre éditeur MQTT. L’instance de connecteur data lake s’abonne à la rubrique et ingère les messages dans la table Delta.

  12. À l’aide d’un navigateur, vérifiez que les données sont importées dans la lakehouse. Dans l’espace de travail Microsoft Fabric, sélectionnez votre lakehouse, puis Tableaux. Vous devez voir les données du tableau.

Table non identifiée

Si vos données s’affichent dans la tablenon identifiée non identifiée :

La cause peut ne pas être prise en charge dans le nom de la table. Le nom de la table doit être un nom de conteneur de stockage Azure valide, ce qui signifie qu’il peut contenir n’importe quelle lettre anglaise, majuscule ou minuscule, et sous la barre _, avec une longueur allant jusqu’à 256 caractères. Aucun tiret - ou caractères d’espace n’est autorisé.

Effectuer une configuration pour envoyer des données à Azure Data Lake Storage Gen2 à l’aide du jeton SAP

Configurez un connecteur data lake pour vous connecter à un compte Azure Data Lake Storage Gen2 (ADLS Gen2) à l’aide d’un jeton de signature d’accès partagé (SAP).

  1. Obtenez un jeton SAS pour un compte Azure Data Lake Storage Gen2 (ADLS Gen2). Par exemple, utilisez le portail Azure pour accéder à votre compte de stockage. Dans le menu sous Sécurité +réseau, choisissez signature d’accès partagé. Utilisez le tableau suivant pour définir les autorisations requises.

    Paramètre Valeur
    Services autorisés Objet blob
    Types de ressources autorisés Conteneur d'objet.
    Autorisations autorisées Lecture, écriture, suppression, liste, création

    Pour optimiser les privilèges minimum, vous pouvez également choisir d’obtenir la SAP pour un conteneur individuel. Pour éviter les erreurs d’authentification, assurez-vous que le conteneur correspond à la valeur table.tableName dans la configuration de la carte de rubriques.

  2. Créez un secret Kubernetes avec le jeton SAP. N’incluez pas le point d’interrogation ? qui peut être au début du jeton.

    kubectl create secret generic my-sas \
    --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \
    -n azure-iot-operations
    
  3. Créez une ressource DataLakeConnector qui définit les paramètres de configuration et de point de terminaison du connecteur. Vous pouvez utiliser le YAML fourni comme exemple, mais veillez à modifier les champs suivants :

    • endpoint: point de terminaison Data Lake Storage du compte de stockage ADLSv2 sous la forme de https://example.blob.core.windows.net. Dans le portail Azure, recherchez le point de terminaison sous compte de stockage > Settings > Endpoints > Data Lake Storage.
    • accessTokenSecretName: nom du secret Kubernetes contenant le jeton SAP (my-sas de l’exemple précédent).
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: "debug"
      databaseFormat: "delta"
      target:
        datalakeStorage:
          endpoint: "https://example.blob.core.windows.net"
          authentication:
            accessTokenSecretName: "my-sas"
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  4. Créez une ressource DataLakeConnectorTopicMap qui définit le mappage entre la rubrique MQTT et la table Delta dans Data Lake Storage. Vous pouvez utiliser le YAML fourni comme exemple, mais veillez à modifier les champs suivants :

    • dataLakeConnectorRef: nom de la ressource DataLakeConnector que vous avez créée précédemment.
    • clientId: identificateur unique pour votre client MQTT.
    • mqttSourceTopic: nom de la rubrique MQTT à partir de laquelle vous souhaitez que les données proviennent.
    • table.tableName: nom du conteneur auquel vous souhaitez ajouter dans Data Lake Storage. Si le jeton SAP est limité au compte, le conteneur est automatiquement créé s’il est manquant.
    • table.schema: schéma de la table Delta, qui doit correspondre au format et aux champs des messages JSON que vous envoyez à la rubrique MQTT.
  5. Appliquez les ressources dataLakeConnector et DataLakeConnectorTopicMap à votre cluster Kubernetes à l’aide de kubectl apply -f datalake-connector.yaml.

  6. Commencez à envoyer des messages JSON à la rubrique MQTT à l’aide de votre éditeur MQTT. L’instance de connecteur data lake s’abonne à la rubrique et ingère les messages dans la table Delta.

  7. À l’aide du portail Azure, vérifiez que la table Delta est créée. Les fichiers sont organisés par ID client, nom de l’instance du connecteur, rubrique MQTT et heure. Dans votre compte de stockage >Conteneurs, ouvrez le conteneur que vous avez spécifié dans le DataLakeConnectorTopicMap. Vérifiez _delta_log existe et que les fichiers d’analyse affichent le trafic MQTT. Ouvrez un fichier d’analyse pour confirmer que la charge utile correspond à ce qui a été envoyé et défini dans le schéma.

Utiliser l’identité managée pour l’authentification auprès d’ADLSv2

Pour utiliser l’identité managée, spécifiez-la comme seule méthode sous DataLakeConnector authentication. Utilisez az k8s-extension show pour rechercher l’ID principal de l’extension IoT MQ Arc, puis attribuez un rôle à l’identité managée qui accorde l’autorisation d’écrire dans le compte de stockage, tel que contributeur aux données Blob de stockage. Pour plus d’informations, consultez Autoriser l’accès aux objets blob à l’aide de Microsoft Entra ID.

authentication:
  systemAssignedManagedIdentity:
    audience: https://my-account.blob.core.windows.net

Configurer pour envoyer des données à Azure Data Explorer à l’aide d’une identité managée

Configurer le connecteur du lac de données pour envoyer des données au point de terminaison Azure Data Explorer à l’aide de l’identité managée

  1. Assurez-vous que les étapes des conditions préalables sont respectées, y compris un cluster Azure Data Explorer complet. L’option « cluster libre » ne fonctionne pas.

  2. Une fois le cluster créé, créez une base de données pour le stockage de vos données.

  3. Vous pouvez créer une table pour les données concernées par le Portail Azure et créer des colonnes manuellement, ou utiliser KQL dans l’onglet requête. Exemple :

    .create table thermostat (
        externalAssetId: string,
        assetName: string,
        CurrentTemperature: real,
        Pressure: real,
        MqttTopic: string,
        Timestamp: datetime
    )
    

Activer l’ingestion de streaming

Activez l’ingestion de streaming dans votre table et votre base de données. Dans l’onglet requête, exécutez la commande suivante, en remplaçant <DATABASE_NAME> par le nom de votre base de données :

.alter database <DATABASE_NAME> policy streamingingestion enable

Ajouter l’identité managée au cluster Azure Data Explorer

Pour que le connecteur s’authentifie auprès d’Azure Data Explorer, vous devez ajouter l’identité managée au cluster Azure Data Explorer.

  1. Dans le Portail Azure, accédez au cluster Kubernetes connecté à Arc et sélectionnez Paramètres>Extensions. Dans la liste des extensions, recherchez le nom de votre extension IoT MQ. Le nom commence par mq-, suivi de cinq caractères aléatoires. Par exemple, mq-4jgjs. Les noms de l’extension MQ IoT et de l’identité managée MQ sont identiques.
  2. Dans votre base de données Azure Data Explorer, sélectionnez Autorisations>Ajouter>Ingéreur. Recherchez le nom de l’identité managée MQ et ajoutez-le.

Pour plus d’informations sur l’ajout d’une autorisation, consultez Gérer les autorisations de cluster Azure Data Explorer.

Vous êtes maintenant prêt à déployer le connecteur et à envoyer des données à Azure Data Explorer.

Fichier d’un exemple de déploiement

Exemple de fichier de déploiement pour le connecteur Azure Data Explorer. Les commentaires commençant par TODO vous obligent à remplacer les paramètres de l’espace réservé par vos informations.

apiVersion: mq.iotoperations.azure.com/v1beta1
  name: my-adx-connector
  namespace: azure-iot-operations
spec:
    repository: mcr.microsoft.com/azureiotoperations/datalake
    tag: 0.4.0-preview
    pullPolicy: Always
  databaseFormat: adx
  target:
      # TODO: insert the ADX cluster endpoint
      endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
      authentication:
        systemAssignedManagedIdentity:
          audience: https://api.kusto.windows.net
  localBrokerConnection:
    endpoint: aio-mq-dmqtt-frontend:8883
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
    authentication:
      kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: adx-topicmap
  namespace: azure-iot-operations
spec:
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      # TODO: add DB and table name
      tablePath: <DATABASE_NAME>
      tableName: <TABLE_NAME>
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: MqttTopic
        format: utf8
        optional: false
        mapping: $topic
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

Cet exemple accepte les données de la rubrique azure-iot-operations/data/thermostat avec des messages au format JSON, par exemple :

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

DataLakeConnector

Un DataLakeConnector est une ressource personnalisée Kubernetes qui définit la configuration et les propriétés d’une instance de connecteur du lac de données. Un connecteur du lac de données ingère les données des rubriques MQTT dans des tables Delta dans un compte Data Lake Storage.

Le champ de spécification d’une ressource DataLakeConnector contient les sous-champs suivants :

  • protocol: version MQTT. Il peut s’agir de l’un des v5 ou v3.
  • image: le champ image spécifie l’image conteneur du module de connecteur du lac de données. Il comporte les sous-champs suivants :
    • repository: nom du registre de conteneurs et du référentiel où l’image est stockée.
    • tag: balise de l’image à utiliser.
    • pullPolicy: stratégie de tirage pour l’image. Il peut s’agir de l’un des Always, IfNotPresentou Never.
  • instances: nombre de réplicas du connecteur du lac de données à exécuter.
  • logLevel: niveau de journal pour le module de connecteur Data Lake. Il peut s’agir de l’un des trace, debug, info, warn, errorou fatal.
  • databaseFormat: format des données à ingérer dans Data Lake Storage. Il peut s’agir de l’un des delta ou parquet.
  • target: le champ cible spécifie la destination de l’ingestion de données. Il peut s’agir de datalakeStorage, fabricOneLake, adx ou localStorage.
    • datalakeStorage : spécifier la configuration et les propriétés du compte ADLSv2. Il comporte les sous-champs suivants :
      • endpoint: URL du point de terminaison du compte Data Lake Storage. N’incluez aucune barre oblique de fin /.
      • authentication: le champ d’authentification spécifie le type et les informations d’identification pour accéder au compte Data Lake Storage. Il peut s’agir de l’un des éléments suivants.
        • accessTokenSecretName: nom du secret Kubernetes pour l’utilisation de l’authentification par jeton d’accès partagé pour le compte Data Lake Storage. Ce champ est obligatoire si le type est accessToken.
        • systemAssignedManagedIdentity: pour utiliser l’identité managée système pour l’authentification. Il a un sous-champ
          • audience: chaîne sous la forme de https://<my-account-name>.blob.core.windows.net pour l’audience de jeton d’identité managée étendue au niveau du compte ou https://storage.azure.com pour n’importe quel compte de stockage.
    • fabricOneLake: spécifie la configuration et les propriétés de Microsoft Fabric OneLake. Il comporte les sous-champs suivants :
      • endpoint: URL du point de terminaison Microsoft Fabric OneLake. Il est généralement https://onelake.dfs.fabric.microsoft.com parce que c’est le point de terminaison global OneLake. Si vous utilisez un point de terminaison régional, il se trouve sous la forme de https://<region>-onelake.dfs.fabric.microsoft.com. N’incluez aucune barre oblique de fin /. Pour plus d’informations, consultez Connexion à Microsoft OneLake.
      • names: spécifie les noms de l’espace de travail et de la lakehouse. Utilisez ce champ ou guids. Ne pas les utiliser simultanément. Il comporte les sous-champs suivants :
        • workspaceName : nom de l’espace de travail.
        • lakehouseName: nom du lac.
      • guids: spécifie les GUID de l’espace de travail et du lakehouse. Utilisez ce champ ou names. Ne pas les utiliser simultanément. Il comporte les sous-champs suivants :
        • workspaceGuid: GUID de l’espace de travail.
        • lakehouseGuid: GUID de la lakehouse.
      • fabricPath: emplacement des données dans l’espace de travail Fabric. Il peut être tables ou files. S’il s’agit de tables, les données sont stockées dans l’infrastructure OneLake sous forme de tables. S’il s’agit de files, les données sont stockées dans l’infrastructure OneLake en tant que fichiers. Si c’est files, le databaseFormat doit être parquet.
      • authentication: le champ d’authentification spécifie le type et les informations d’identification pour accéder à Microsoft Fabric OneLake. Il ne peut être systemAssignedManagedIdentity pour l’instant. Il a un sous-champ :
      • systemAssignedManagedIdentity: pour utiliser l’identité managée système pour l’authentification. Il a un sous-champ
        • audience: chaîne pour l’audience du jeton d’identité managée et doit être https://storage.azure.com.
    • adx : spécifier la configuration et les propriétés de la base de données Azure Data Explorer. Il comporte les sous-champs suivants :
      • endpoint : l’URL du point de terminaison du cluster Azure Data Explorer comme https://<CLUSTER>.<REGION>.kusto.windows.net. N’incluez aucune barre oblique de fin /.
      • authentication : le champ d’authentification spécifie le type et les informations d’identification pour accéder au cluster Azure Data Explorer. Il ne peut être systemAssignedManagedIdentity pour l’instant. Il a un sous-champ :
        • systemAssignedManagedIdentity: pour utiliser l’identité managée système pour l’authentification. Il a un sous-champ
          • audience : chaîne pour l’audience du jeton d’identité managée et doit être https://api.kusto.windows.net.
    • localStorage: spécifier la configuration et les propriétés du compte de stockage local. Il comporte les sous-champs suivants :
      • volumeName: nom du volume monté dans chacun des pods de connecteur.
  • localBrokerConnection: utilisé pour remplacer la configuration de connexion par défaut au répartiteur MQ MQTT IoT. Consultez Gérer lesde connexion de répartiteur local.

DataLakeConnectorTopicMap

Un DataLakeConnectorTopicMap est une ressource personnalisée Kubernetes qui définit le mappage entre une rubrique MQTT et une table Delta dans un compte Data Lake Storage. Une ressource DataLakeConnectorTopicMap référence une ressource DataLakeConnector qui s’exécute sur le même périphérique et ingère des données de la rubrique MQTT dans la table Delta.

Le champ de spécification d’une ressource DataLakeConnectorTopicMap contient les sous-champs suivants :

  • dataLakeConnectorRef: nom de la ressource DataLakeConnector à laquelle appartient cette carte de rubriques.
  • mapping: le champ de mappage spécifie les détails et les propriétés de la rubrique MQTT et de la table Delta. Il comporte les sous-champs suivants :
    • allowedLatencySecs: latence maximale en secondes entre la réception d’un message à partir de la rubrique MQTT et son ingestion dans la table Delta. Ce champ est obligatoire.
    • clientId: identificateur unique pour le client MQTT qui s’abonne à la rubrique.
    • maxMessagesPerBatch: nombre maximal de messages à ingérer dans un lot dans la table Delta. En raison d’une restriction temporaire, cette valeur doit être inférieure à 16 si qos est défini sur 1. Ce champ est obligatoire.
    • messagePayloadType: type de charge utile envoyé à la rubrique MQTT. Il peut s’agir de l’un des json ou avro (pas encore pris en charge).
    • mqttSourceTopic: nom de la ou des rubriques MQTT auxquelles s’abonner. Prend en charge notation générique de rubrique MQTT.
    • qos: niveau de qualité de service pour l’abonnement à la rubrique MQTT. Il peut s’agir de l’un des 0 ou 1.
    • table: le champ de table spécifie la configuration et les propriétés de la table Delta dans le compte Data Lake Storage. Il comporte les sous-champs suivants :
      • tableName: nom de la table Delta à créer ou à ajouter dans le compte Data Lake Storage. Ce champ est également appelé nom de conteneur lorsqu’il est utilisé avec Azure Data Lake Storage Gen2. Il peut contenir n’importe quelle lettre minuscule de l'alphabet anglais et le trait de soulignement _, avec une longueur allant jusqu’à 256 caractères. Aucun tiret - ou caractères d’espace n’est autorisé.
      • tablePath : nom de la base de données Azure Data Explorer lors de l’utilisation du connecteur de type adx.
      • schema: schéma de la table Delta, qui doit correspondre au format et aux champs de la charge utile du message. Il s’agit d’un tableau d’objets, chacun avec les sous-champs suivants :
        • name: nom de la colonne dans la table Delta.
        • format: type de données de la colonne dans la table Delta. Il peut s’agir de l’un des boolean, int8, int16, int32, int64, uInt8, uInt16, uInt32, uInt64, float16, float32, float64, date32, timestamp, binaryou utf8. Les types non signés, tels que uInt8, ne sont pas entièrement pris en charge et sont traités comme des types signés s’ils sont spécifiés ici.
        • optional: valeur booléenne qui indique si la colonne est facultative ou obligatoire. Ce champ est facultatif et est défini par défaut sur false.
        • mapping: expression de chemin JSON qui définit comment extraire la valeur de la colonne à partir de la charge utile du message MQTT. Les mappages intégrés $client_id, $topic, $property et $received_time sont disponibles pour une utilisation en tant que colonnes pour enrichir le JSON dans le corps du message MQTT. Ce champ est obligatoire. Utilisez $property pour les propriétés utilisateur MQTT. $property.assetId représente par exemple la valeur de la propriété assetId à partir du message MQTT.

Voici un exemple de ressource DataLakeConnectorTopicMap :

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: datalake-topicmap
  namespace: azure-iot-operations
spec:
  dataLakeConnectorRef: my-datalake-connector
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      tableName: thermostat
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

Un JSON échappé comme "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}" n’est pas pris en charge et provoque l’erreur d’un convertisseur a trouvé une valeur null.

Exemple de message pour la rubrique azure-iot-operations/data/thermostat qui fonctionne avec ce schéma :

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

Auquel correspond :

externalAssetId assetName CurrentTemperature Pression mqttTopic timestamp
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx thermostat-de 5506 5506 dlc 2024-04-02T22:36:03.1827681Z

Important

Si le schéma de données est mis à jour, par exemple un type de données est modifié ou qu’un nom est modifié, la transformation des données entrantes peut cesser de fonctionner. Vous devez modifier le nom de la table de données si une modification de schéma se produit.

Delta ou parquet

Les formats delta et parquet sont pris en charge.

Gérer la connexion de répartiteur local

Comme le pont MQTT, le connecteur du lac de données agit en tant que client pour le répartiteur MQ MQTT IoT. Si vous avez personnalisé le port d’écouteur ou l’authentification de votre répartiteur MQTT IoT, remplacez également la configuration de connexion MQTT locale pour le connecteur du lac de données. Pour plus d’informations, consultez connexion de répartiteur local de pont MQTT.

Publier et souscrire à des messages MQTT en utilisant Azure IoT MQ (préversion)