Envoyer des données depuis Azure IoT MQ (préversion) vers Data Lake Storage
Important
Opérations Azure IoT (préversion) – activé parc Azure Arc est actuellement en PRÉVERSION. Vous ne devez pas utiliser ce logiciel en préversion dans des environnements de production.
Pour connaître les conditions juridiques qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou plus généralement non encore en disponibilité générale, consultez l’Avenant aux conditions d’utilisation des préversions de Microsoft Azure.
Vous pouvez utiliser le connecteur de lac de données pour envoyer des données du répartiteur Azure IoT MQ (préversion) à un lac de données, comme Azure Data Lake Storage Gen2 (ADLSv2), Microsoft Fabric OneLake et Azure Data Explorer. Le connecteur s’abonne aux rubriques MQTT et ingère les messages dans des tables Delta dans le compte Data Lake Storage.
Prérequis
Un compte Data Lake Storage dans Azure avec un conteneur et un dossier pour vos données. Pour plus d’informations sur la création d’un Data Lake Storage, utilisez l’une des options de démarrage rapide suivantes :
- Démarrage rapide de Microsoft Fabric OneLake :
- Créer un espace de travail , car la par défaut de mon espace de travail n’est pas prise en charge.
- Créer un lakehouse.
- Démarrage rapide d’Azure Data Lake Storage Gen2 :
- Créer un compte de stockage à utiliser avec Azure Data Lake Storage Gen2.
- Cluster Azure Data Explorer :
- Suivez les étapes du Cluster complet dans le Démarrage rapide : Créer un cluster et une base de données pour Azure Data Explorer.
- Démarrage rapide de Microsoft Fabric OneLake :
Répartiteur MQ MQTT IoT. Pour plus d’informations sur le déploiement d’un MQTT broker IoT MQ, consultez Démarrage rapide : Déployer Opérations Azure IoT (préversion) sur un cluster Kubernetes avec Arc.
Effectuer une configuration pour envoyer des données à Microsoft Fabric OneLake à l’aide de l’identité managée
Configurez un connecteur du lac de données pour vous connecter à Microsoft Fabric OneLake à l’aide d’une identité managée.
Vérifiez que les étapes des prérequis sont remplies, notamment un espace de travail Microsoft Fabric et un lakehouse. La valeur par défaut mon espace de travail ne peut pas être utilisée.
Vérifiez que l’extension IoT MQ Arc est installée et configurée avec l’identité managée.
Dans le Portail Azure, accédez au cluster Kubernetes connecté à Arc et sélectionnez Paramètres>Extensions. Dans la liste des extensions, recherchez le nom de votre extension IoT MQ. Le nom commence par
mq-
, suivi de cinq caractères aléatoires. Par exemple, mq-4jgjs.Obtenez l’ID d’application associé à l’identité managée de l’extension IoT MQ Arc, puis notez la valeur GUID. L’ID d’application est différent de l’ID d’objet ou de principal. Vous pouvez utiliser Azure CLI en recherchant l’ID d’objet de l’identité managée, puis en interrogeant l’ID d’application du principal de service associé à l’identité managée. Par exemple :
OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv) az ad sp show --query appId --id $OBJECT_ID --output tsv
Vous devez obtenir une sortie avec une valeur GUID :
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
Ce GUID est l’ID d’application que vous devez utiliser à l’étape suivante.
Dans l’espace de travail Microsoft Fabric, utilisez Gérer l’accès, puis sélectionnez + Ajouter des personnes ou des groupes.
Recherchez l’extension IoT MQ Arc par son nom « mq », puis veillez à sélectionner la valeur GUID de l’ID d’application que vous avez trouvée à l’étape précédente.
Sélectionnez Contributeur en tant que rôle, puis sélectionnez Ajouter.
Créez une ressource DataLakeConnector qui définit les paramètres de configuration et de point de terminaison du connecteur. Vous pouvez utiliser le YAML fourni comme exemple, mais veillez à modifier les champs suivants :
target.fabricOneLake.endpoint
: le point de terminaison du compte Microsoft Fabric OneLake. Vous pouvez obtenir l’URL du point de terminaison à partir du lakehouse Microsoft Fabric sous Fichiers>Propriétés. L'URL doit ressembler àhttps://onelake.dfs.fabric.microsoft.com
.target.fabricOneLake.names
: noms de l’espace de travail et de la lakehouse. Utilisez ce champ ouguids
. Ne pas les utiliser simultanément.workspaceName
: nom de l’espace de travail.lakehouseName
: nom du lac.
apiVersion: mq.iotoperations.azure.com/v1beta1 kind: DataLakeConnector metadata: name: my-datalake-connector namespace: azure-iot-operations spec: protocol: v5 image: repository: mcr.microsoft.com/azureiotoperations/datalake tag: 0.4.0-preview pullPolicy: IfNotPresent instances: 2 logLevel: info databaseFormat: delta target: fabricOneLake: # Example: https://onelake.dfs.fabric.microsoft.com endpoint: <example-endpoint-url> names: workspaceName: <example-workspace-name> lakehouseName: <example-lakehouse-name> ## OR # guids: # workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx # lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx fabricPath: tables authentication: systemAssignedManagedIdentity: audience: https://storage.azure.com/ localBrokerConnection: endpoint: aio-mq-dmqtt-frontend:8883 tls: tlsEnabled: true trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only authentication: kubernetes: {}
Créez une ressource DataLakeConnectorTopicMap qui définit le mappage entre la rubrique MQTT et la table Delta dans Data Lake Storage. Vous pouvez utiliser le YAML fourni comme exemple, mais veillez à modifier les champs suivants :
dataLakeConnectorRef
: nom de la ressource DataLakeConnector que vous avez créée précédemment.clientId
: identificateur unique pour votre client MQTT.mqttSourceTopic
: nom de la rubrique MQTT à partir de laquelle vous souhaitez que les données proviennent.table.tableName
: nom de la table à ajouter à la lakehouse. La table est créée automatiquement si elle n’existe pas.table.schema
: schéma de la table Delta qui doit correspondre au format et aux champs des messages JSON que vous envoyez à la rubrique MQTT.
Appliquez les ressources DataLakeConnector et DataLakeConnectorTopicMap à votre cluster Kubernetes à l’aide de
kubectl apply -f datalake-connector.yaml
.Commencez à envoyer des messages JSON à la rubrique MQTT à l’aide de votre éditeur MQTT. L’instance de connecteur data lake s’abonne à la rubrique et ingère les messages dans la table Delta.
À l’aide d’un navigateur, vérifiez que les données sont importées dans la lakehouse. Dans l’espace de travail Microsoft Fabric, sélectionnez votre lakehouse, puis Tableaux. Vous devez voir les données du tableau.
Table non identifiée
Si vos données s’affichent dans la tablenon identifiée non identifiée :
La cause peut ne pas être prise en charge dans le nom de la table. Le nom de la table doit être un nom de conteneur de stockage Azure valide, ce qui signifie qu’il peut contenir n’importe quelle lettre anglaise, majuscule ou minuscule, et sous la barre _
, avec une longueur allant jusqu’à 256 caractères. Aucun tiret -
ou caractères d’espace n’est autorisé.
Effectuer une configuration pour envoyer des données à Azure Data Lake Storage Gen2 à l’aide du jeton SAP
Configurez un connecteur data lake pour vous connecter à un compte Azure Data Lake Storage Gen2 (ADLS Gen2) à l’aide d’un jeton de signature d’accès partagé (SAP).
Obtenez un jeton SAS pour un compte Azure Data Lake Storage Gen2 (ADLS Gen2). Par exemple, utilisez le portail Azure pour accéder à votre compte de stockage. Dans le menu sous Sécurité +réseau, choisissez signature d’accès partagé. Utilisez le tableau suivant pour définir les autorisations requises.
Paramètre Valeur Services autorisés Objet blob Types de ressources autorisés Conteneur d'objet. Autorisations autorisées Lecture, écriture, suppression, liste, création Pour optimiser les privilèges minimum, vous pouvez également choisir d’obtenir la SAP pour un conteneur individuel. Pour éviter les erreurs d’authentification, assurez-vous que le conteneur correspond à la valeur
table.tableName
dans la configuration de la carte de rubriques.Créez un secret Kubernetes avec le jeton SAP. N’incluez pas le point d’interrogation
?
qui peut être au début du jeton.kubectl create secret generic my-sas \ --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \ -n azure-iot-operations
Créez une ressource DataLakeConnector qui définit les paramètres de configuration et de point de terminaison du connecteur. Vous pouvez utiliser le YAML fourni comme exemple, mais veillez à modifier les champs suivants :
endpoint
: point de terminaison Data Lake Storage du compte de stockage ADLSv2 sous la forme dehttps://example.blob.core.windows.net
. Dans le portail Azure, recherchez le point de terminaison sous compte de stockage > Settings > Endpoints > Data Lake Storage.accessTokenSecretName
: nom du secret Kubernetes contenant le jeton SAP (my-sas
de l’exemple précédent).
apiVersion: mq.iotoperations.azure.com/v1beta1 kind: DataLakeConnector metadata: name: my-datalake-connector namespace: azure-iot-operations spec: protocol: v5 image: repository: mcr.microsoft.com/azureiotoperations/datalake tag: 0.4.0-preview pullPolicy: IfNotPresent instances: 2 logLevel: "debug" databaseFormat: "delta" target: datalakeStorage: endpoint: "https://example.blob.core.windows.net" authentication: accessTokenSecretName: "my-sas" localBrokerConnection: endpoint: aio-mq-dmqtt-frontend:8883 tls: tlsEnabled: true trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only authentication: kubernetes: {}
Créez une ressource DataLakeConnectorTopicMap qui définit le mappage entre la rubrique MQTT et la table Delta dans Data Lake Storage. Vous pouvez utiliser le YAML fourni comme exemple, mais veillez à modifier les champs suivants :
dataLakeConnectorRef
: nom de la ressource DataLakeConnector que vous avez créée précédemment.clientId
: identificateur unique pour votre client MQTT.mqttSourceTopic
: nom de la rubrique MQTT à partir de laquelle vous souhaitez que les données proviennent.table.tableName
: nom du conteneur auquel vous souhaitez ajouter dans Data Lake Storage. Si le jeton SAP est limité au compte, le conteneur est automatiquement créé s’il est manquant.table.schema
: schéma de la table Delta, qui doit correspondre au format et aux champs des messages JSON que vous envoyez à la rubrique MQTT.
Appliquez les ressources dataLakeConnector et DataLakeConnectorTopicMap à votre cluster Kubernetes à l’aide de
kubectl apply -f datalake-connector.yaml
.Commencez à envoyer des messages JSON à la rubrique MQTT à l’aide de votre éditeur MQTT. L’instance de connecteur data lake s’abonne à la rubrique et ingère les messages dans la table Delta.
À l’aide du portail Azure, vérifiez que la table Delta est créée. Les fichiers sont organisés par ID client, nom de l’instance du connecteur, rubrique MQTT et heure. Dans votre compte de stockage >Conteneurs, ouvrez le conteneur que vous avez spécifié dans le DataLakeConnectorTopicMap. Vérifiez _delta_log existe et que les fichiers d’analyse affichent le trafic MQTT. Ouvrez un fichier d’analyse pour confirmer que la charge utile correspond à ce qui a été envoyé et défini dans le schéma.
Utiliser l’identité managée pour l’authentification auprès d’ADLSv2
Pour utiliser l’identité managée, spécifiez-la comme seule méthode sous DataLakeConnector authentication
. Utilisez az k8s-extension show
pour rechercher l’ID principal de l’extension IoT MQ Arc, puis attribuez un rôle à l’identité managée qui accorde l’autorisation d’écrire dans le compte de stockage, tel que contributeur aux données Blob de stockage. Pour plus d’informations, consultez Autoriser l’accès aux objets blob à l’aide de Microsoft Entra ID.
authentication:
systemAssignedManagedIdentity:
audience: https://my-account.blob.core.windows.net
Configurer pour envoyer des données à Azure Data Explorer à l’aide d’une identité managée
Configurer le connecteur du lac de données pour envoyer des données au point de terminaison Azure Data Explorer à l’aide de l’identité managée
Assurez-vous que les étapes des conditions préalables sont respectées, y compris un cluster Azure Data Explorer complet. L’option « cluster libre » ne fonctionne pas.
Une fois le cluster créé, créez une base de données pour le stockage de vos données.
Vous pouvez créer une table pour les données concernées par le Portail Azure et créer des colonnes manuellement, ou utiliser KQL dans l’onglet requête. Exemple :
.create table thermostat ( externalAssetId: string, assetName: string, CurrentTemperature: real, Pressure: real, MqttTopic: string, Timestamp: datetime )
Activer l’ingestion de streaming
Activez l’ingestion de streaming dans votre table et votre base de données. Dans l’onglet requête, exécutez la commande suivante, en remplaçant <DATABASE_NAME>
par le nom de votre base de données :
.alter database <DATABASE_NAME> policy streamingingestion enable
Ajouter l’identité managée au cluster Azure Data Explorer
Pour que le connecteur s’authentifie auprès d’Azure Data Explorer, vous devez ajouter l’identité managée au cluster Azure Data Explorer.
- Dans le Portail Azure, accédez au cluster Kubernetes connecté à Arc et sélectionnez Paramètres>Extensions. Dans la liste des extensions, recherchez le nom de votre extension IoT MQ. Le nom commence par
mq-
, suivi de cinq caractères aléatoires. Par exemple, mq-4jgjs. Les noms de l’extension MQ IoT et de l’identité managée MQ sont identiques. - Dans votre base de données Azure Data Explorer, sélectionnez Autorisations>Ajouter>Ingéreur. Recherchez le nom de l’identité managée MQ et ajoutez-le.
Pour plus d’informations sur l’ajout d’une autorisation, consultez Gérer les autorisations de cluster Azure Data Explorer.
Vous êtes maintenant prêt à déployer le connecteur et à envoyer des données à Azure Data Explorer.
Fichier d’un exemple de déploiement
Exemple de fichier de déploiement pour le connecteur Azure Data Explorer. Les commentaires commençant par TODO
vous obligent à remplacer les paramètres de l’espace réservé par vos informations.
apiVersion: mq.iotoperations.azure.com/v1beta1
name: my-adx-connector
namespace: azure-iot-operations
spec:
repository: mcr.microsoft.com/azureiotoperations/datalake
tag: 0.4.0-preview
pullPolicy: Always
databaseFormat: adx
target:
# TODO: insert the ADX cluster endpoint
endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
authentication:
systemAssignedManagedIdentity:
audience: https://api.kusto.windows.net
localBrokerConnection:
endpoint: aio-mq-dmqtt-frontend:8883
tls:
tlsEnabled: true
trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
authentication:
kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
name: adx-topicmap
namespace: azure-iot-operations
spec:
mapping:
allowedLatencySecs: 1
messagePayloadType: json
maxMessagesPerBatch: 10
clientId: id
mqttSourceTopic: azure-iot-operations/data/thermostat
qos: 1
table:
# TODO: add DB and table name
tablePath: <DATABASE_NAME>
tableName: <TABLE_NAME>
schema:
- name: externalAssetId
format: utf8
optional: false
mapping: $property.externalAssetId
- name: assetName
format: utf8
optional: false
mapping: DataSetWriterName
- name: CurrentTemperature
format: float32
optional: false
mapping: Payload.temperature.Value
- name: Pressure
format: float32
optional: true
mapping: "Payload.Tag 10.Value"
- name: MqttTopic
format: utf8
optional: false
mapping: $topic
- name: Timestamp
format: timestamp
optional: false
mapping: $received_time
Cet exemple accepte les données de la rubrique azure-iot-operations/data/thermostat
avec des messages au format JSON, par exemple :
{
"SequenceNumber": 4697,
"Timestamp": "2024-04-02T22:36:03.1827681Z",
"DataSetWriterName": "thermostat",
"MessageType": "ua-deltaframe",
"Payload": {
"temperature": {
"SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
"Value": 5506
},
"Tag 10": {
"SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
"Value": 5506
}
}
}
DataLakeConnector
Un DataLakeConnector est une ressource personnalisée Kubernetes qui définit la configuration et les propriétés d’une instance de connecteur du lac de données. Un connecteur du lac de données ingère les données des rubriques MQTT dans des tables Delta dans un compte Data Lake Storage.
Le champ de spécification d’une ressource DataLakeConnector contient les sous-champs suivants :
protocol
: version MQTT. Il peut s’agir de l’un desv5
ouv3
.image
: le champ image spécifie l’image conteneur du module de connecteur du lac de données. Il comporte les sous-champs suivants :repository
: nom du registre de conteneurs et du référentiel où l’image est stockée.tag
: balise de l’image à utiliser.pullPolicy
: stratégie de tirage pour l’image. Il peut s’agir de l’un desAlways
,IfNotPresent
ouNever
.
instances
: nombre de réplicas du connecteur du lac de données à exécuter.logLevel
: niveau de journal pour le module de connecteur Data Lake. Il peut s’agir de l’un destrace
,debug
,info
,warn
,error
oufatal
.databaseFormat
: format des données à ingérer dans Data Lake Storage. Il peut s’agir de l’un desdelta
ouparquet
.target
: le champ cible spécifie la destination de l’ingestion de données. Il peut s’agir dedatalakeStorage
,fabricOneLake
,adx
oulocalStorage
.datalakeStorage
: spécifier la configuration et les propriétés du compte ADLSv2. Il comporte les sous-champs suivants :endpoint
: URL du point de terminaison du compte Data Lake Storage. N’incluez aucune barre oblique de fin/
.authentication
: le champ d’authentification spécifie le type et les informations d’identification pour accéder au compte Data Lake Storage. Il peut s’agir de l’un des éléments suivants.accessTokenSecretName
: nom du secret Kubernetes pour l’utilisation de l’authentification par jeton d’accès partagé pour le compte Data Lake Storage. Ce champ est obligatoire si le type estaccessToken
.systemAssignedManagedIdentity
: pour utiliser l’identité managée système pour l’authentification. Il a un sous-champaudience
: chaîne sous la forme dehttps://<my-account-name>.blob.core.windows.net
pour l’audience de jeton d’identité managée étendue au niveau du compte ouhttps://storage.azure.com
pour n’importe quel compte de stockage.
fabricOneLake
: spécifie la configuration et les propriétés de Microsoft Fabric OneLake. Il comporte les sous-champs suivants :endpoint
: URL du point de terminaison Microsoft Fabric OneLake. Il est généralementhttps://onelake.dfs.fabric.microsoft.com
parce que c’est le point de terminaison global OneLake. Si vous utilisez un point de terminaison régional, il se trouve sous la forme dehttps://<region>-onelake.dfs.fabric.microsoft.com
. N’incluez aucune barre oblique de fin/
. Pour plus d’informations, consultez Connexion à Microsoft OneLake.names
: spécifie les noms de l’espace de travail et de la lakehouse. Utilisez ce champ ouguids
. Ne pas les utiliser simultanément. Il comporte les sous-champs suivants :workspaceName
: nom de l’espace de travail.lakehouseName
: nom du lac.
guids
: spécifie les GUID de l’espace de travail et du lakehouse. Utilisez ce champ ounames
. Ne pas les utiliser simultanément. Il comporte les sous-champs suivants :workspaceGuid
: GUID de l’espace de travail.lakehouseGuid
: GUID de la lakehouse.
fabricPath
: emplacement des données dans l’espace de travail Fabric. Il peut êtretables
oufiles
. S’il s’agit detables
, les données sont stockées dans l’infrastructure OneLake sous forme de tables. S’il s’agit defiles
, les données sont stockées dans l’infrastructure OneLake en tant que fichiers. Si c’estfiles
, ledatabaseFormat
doit êtreparquet
.authentication
: le champ d’authentification spécifie le type et les informations d’identification pour accéder à Microsoft Fabric OneLake. Il ne peut êtresystemAssignedManagedIdentity
pour l’instant. Il a un sous-champ :systemAssignedManagedIdentity
: pour utiliser l’identité managée système pour l’authentification. Il a un sous-champaudience
: chaîne pour l’audience du jeton d’identité managée et doit êtrehttps://storage.azure.com
.
adx
: spécifier la configuration et les propriétés de la base de données Azure Data Explorer. Il comporte les sous-champs suivants :endpoint
: l’URL du point de terminaison du cluster Azure Data Explorer commehttps://<CLUSTER>.<REGION>.kusto.windows.net
. N’incluez aucune barre oblique de fin/
.authentication
: le champ d’authentification spécifie le type et les informations d’identification pour accéder au cluster Azure Data Explorer. Il ne peut êtresystemAssignedManagedIdentity
pour l’instant. Il a un sous-champ :systemAssignedManagedIdentity
: pour utiliser l’identité managée système pour l’authentification. Il a un sous-champaudience
: chaîne pour l’audience du jeton d’identité managée et doit êtrehttps://api.kusto.windows.net
.
localStorage
: spécifier la configuration et les propriétés du compte de stockage local. Il comporte les sous-champs suivants :volumeName
: nom du volume monté dans chacun des pods de connecteur.
localBrokerConnection
: utilisé pour remplacer la configuration de connexion par défaut au répartiteur MQ MQTT IoT. Consultez Gérer lesde connexion de répartiteur local.
DataLakeConnectorTopicMap
Un DataLakeConnectorTopicMap est une ressource personnalisée Kubernetes qui définit le mappage entre une rubrique MQTT et une table Delta dans un compte Data Lake Storage. Une ressource DataLakeConnectorTopicMap référence une ressource DataLakeConnector qui s’exécute sur le même périphérique et ingère des données de la rubrique MQTT dans la table Delta.
Le champ de spécification d’une ressource DataLakeConnectorTopicMap contient les sous-champs suivants :
dataLakeConnectorRef
: nom de la ressource DataLakeConnector à laquelle appartient cette carte de rubriques.mapping
: le champ de mappage spécifie les détails et les propriétés de la rubrique MQTT et de la table Delta. Il comporte les sous-champs suivants :allowedLatencySecs
: latence maximale en secondes entre la réception d’un message à partir de la rubrique MQTT et son ingestion dans la table Delta. Ce champ est obligatoire.clientId
: identificateur unique pour le client MQTT qui s’abonne à la rubrique.maxMessagesPerBatch
: nombre maximal de messages à ingérer dans un lot dans la table Delta. En raison d’une restriction temporaire, cette valeur doit être inférieure à 16 siqos
est défini sur 1. Ce champ est obligatoire.messagePayloadType
: type de charge utile envoyé à la rubrique MQTT. Il peut s’agir de l’un desjson
ouavro
(pas encore pris en charge).mqttSourceTopic
: nom de la ou des rubriques MQTT auxquelles s’abonner. Prend en charge notation générique de rubrique MQTT.qos
: niveau de qualité de service pour l’abonnement à la rubrique MQTT. Il peut s’agir de l’un des 0 ou 1.table
: le champ de table spécifie la configuration et les propriétés de la table Delta dans le compte Data Lake Storage. Il comporte les sous-champs suivants :tableName
: nom de la table Delta à créer ou à ajouter dans le compte Data Lake Storage. Ce champ est également appelé nom de conteneur lorsqu’il est utilisé avec Azure Data Lake Storage Gen2. Il peut contenir n’importe quelle lettre minuscule de l'alphabet anglais et le trait de soulignement_
, avec une longueur allant jusqu’à 256 caractères. Aucun tiret-
ou caractères d’espace n’est autorisé.tablePath
: nom de la base de données Azure Data Explorer lors de l’utilisation du connecteur de typeadx
.schema
: schéma de la table Delta, qui doit correspondre au format et aux champs de la charge utile du message. Il s’agit d’un tableau d’objets, chacun avec les sous-champs suivants :name
: nom de la colonne dans la table Delta.format
: type de données de la colonne dans la table Delta. Il peut s’agir de l’un desboolean
,int8
,int16
,int32
,int64
,uInt8
,uInt16
,uInt32
,uInt64
,float16
,float32
,float64
,date32
,timestamp
,binary
ouutf8
. Les types non signés, tels queuInt8
, ne sont pas entièrement pris en charge et sont traités comme des types signés s’ils sont spécifiés ici.optional
: valeur booléenne qui indique si la colonne est facultative ou obligatoire. Ce champ est facultatif et est défini par défaut sur false.mapping
: expression de chemin JSON qui définit comment extraire la valeur de la colonne à partir de la charge utile du message MQTT. Les mappages intégrés$client_id
,$topic
,$property
et$received_time
sont disponibles pour une utilisation en tant que colonnes pour enrichir le JSON dans le corps du message MQTT. Ce champ est obligatoire. Utilisez $property pour les propriétés utilisateur MQTT. $property.assetId représente par exemple la valeur de la propriété assetId à partir du message MQTT.
Voici un exemple de ressource DataLakeConnectorTopicMap :
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
name: datalake-topicmap
namespace: azure-iot-operations
spec:
dataLakeConnectorRef: my-datalake-connector
mapping:
allowedLatencySecs: 1
messagePayloadType: json
maxMessagesPerBatch: 10
clientId: id
mqttSourceTopic: azure-iot-operations/data/thermostat
qos: 1
table:
tableName: thermostat
schema:
- name: externalAssetId
format: utf8
optional: false
mapping: $property.externalAssetId
- name: assetName
format: utf8
optional: false
mapping: DataSetWriterName
- name: CurrentTemperature
format: float32
optional: false
mapping: Payload.temperature.Value
- name: Pressure
format: float32
optional: true
mapping: "Payload.Tag 10.Value"
- name: Timestamp
format: timestamp
optional: false
mapping: $received_time
Un JSON échappé comme "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}"
n’est pas pris en charge et provoque l’erreur d’un convertisseur a trouvé une valeur null.
Exemple de message pour la rubrique azure-iot-operations/data/thermostat
qui fonctionne avec ce schéma :
{
"SequenceNumber": 4697,
"Timestamp": "2024-04-02T22:36:03.1827681Z",
"DataSetWriterName": "thermostat",
"MessageType": "ua-deltaframe",
"Payload": {
"temperature": {
"SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
"Value": 5506
},
"Tag 10": {
"SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
"Value": 5506
}
}
}
Auquel correspond :
externalAssetId | assetName | CurrentTemperature | Pression | mqttTopic | timestamp |
---|---|---|---|---|---|
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx | thermostat-de | 5506 | 5506 | dlc | 2024-04-02T22:36:03.1827681Z |
Important
Si le schéma de données est mis à jour, par exemple un type de données est modifié ou qu’un nom est modifié, la transformation des données entrantes peut cesser de fonctionner. Vous devez modifier le nom de la table de données si une modification de schéma se produit.
Delta ou parquet
Les formats delta et parquet sont pris en charge.
Gérer la connexion de répartiteur local
Comme le pont MQTT, le connecteur du lac de données agit en tant que client pour le répartiteur MQ MQTT IoT. Si vous avez personnalisé le port d’écouteur ou l’authentification de votre répartiteur MQTT IoT, remplacez également la configuration de connexion MQTT locale pour le connecteur du lac de données. Pour plus d’informations, consultez connexion de répartiteur local de pont MQTT.
Contenu connexe
Publier et souscrire à des messages MQTT en utilisant Azure IoT MQ (préversion)
Commentaires
https://aka.ms/ContentUserFeedback.
Bientôt disponible : Tout au long de l’année 2024, nous abandonnerons progressivement le mécanisme de retour d’information GitHub Issues pour le remplacer par un nouveau système de commentaires. Pour plus d’informations, consultez :Soumettre et afficher des commentaires pour