Inviare dati dall'anteprima MQ di Azure IoT a Data Lake Archiviazione
Importante
Anteprima delle operazioni di Azure IoT: abilitata da Azure Arc è attualmente disponibile in ANTEPRIMA. Non è consigliabile usare questo software di anteprima negli ambienti di produzione.
Vedere le condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure per termini legali aggiuntivi che si applicano a funzionalità di Azure in versione beta, in anteprima o in altro modo non ancora disponibili a livello generale.
È possibile usare il connettore Data Lake per inviare dati dal broker di anteprima di Azure IoT MQ a un data lake, ad esempio Azure Data Lake Archiviazione Gen2 (ADLSv2), Microsoft Fabric OneLake e Azure Esplora dati. Il connettore sottoscrive gli argomenti MQTT e inserisce i messaggi nelle tabelle Delta nell'account data lake Archiviazione.
Prerequisiti
Un account di Archiviazione Data Lake in Azure con un contenitore e una cartella per i dati. Per altre informazioni sulla creazione di un Archiviazione Data Lake, usare una delle opzioni di avvio rapido seguenti:
- Guida introduttiva a OneLake per Microsoft Fabric:
- Creare un'area di lavoro perché l'area di lavoro predefinita non è supportata.
- Creare una lakehouse.
- Avvio rapido di Azure Data Lake Archiviazione Gen2:
- Crea un account di archiviazione da usare con Azure Data Lake Storage Gen2.
- Cluster Esplora dati di Azure:
- Seguire i passaggi del cluster completo nella guida introduttiva: Creare un cluster e un database di Azure Esplora dati.
- Guida introduttiva a OneLake per Microsoft Fabric:
Broker MQTT IoT. Per altre informazioni su come distribuire un broker MQTT di IoT MQTT, vedere Avvio rapido: Distribuire l'anteprima delle operazioni di Azure IoT in un cluster Kubernetes abilitato per Arc.
Configurare per inviare dati a Microsoft Fabric OneLake usando l'identità gestita
Configurare un connettore Data Lake per connettersi a Microsoft Fabric OneLake usando l'identità gestita.
Assicurarsi che i passaggi descritti nei prerequisiti siano soddisfatti, inclusa un'area di lavoro di Microsoft Fabric e una lakehouse. Non è possibile usare l'area di lavoro predefinita.
Assicurarsi che l'estensione MQ Arc di IoT sia installata e configurata con l'identità gestita.
In portale di Azure passare al cluster Kubernetes connesso ad Arc e selezionare Impostazioni> Estensioni. Nell'elenco delle estensioni cercare il nome dell'estensione MQ IoT. Il nome inizia con seguito da
mq-
cinque caratteri casuali. Ad esempio, mq-4jgjs.Ottenere l'ID app associato all'identità gestita dell'estensione MQ Arc di IoT e annotare il valore GUID. L'ID app è diverso dall'oggetto o dall'ID entità. È possibile usare l'interfaccia della riga di comando di Azure individuando l'ID oggetto dell'identità gestita e quindi eseguendo una query sull'ID app dell'entità servizio associata all'identità gestita. Ad esempio:
OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv) az ad sp show --query appId --id $OBJECT_ID --output tsv
Dovrebbe essere visualizzato un output con un valore GUID:
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
Questo GUID è l'ID app che è necessario usare nel passaggio successivo.
Nell'area di lavoro Di Microsoft Fabric usare Gestisci accesso, quindi selezionare + Aggiungi persone o gruppi.
Cercare l'estensione IoT MQ Arc in base al nome "mq" e assicurarsi di selezionare il valore GUID DELL'ID app trovato nel passaggio precedente.
Selezionare Collaboratore come ruolo, quindi selezionare Aggiungi.
Creare una risorsa DataLake Connessione or che definisce le impostazioni di configurazione ed endpoint per il connettore. È possibile usare yaml fornito come esempio, ma assicurarsi di modificare i campi seguenti:
target.fabricOneLake.endpoint
: endpoint dell'account OneLake di Microsoft Fabric. È possibile ottenere l'URL dell'endpoint da Microsoft Fabric lakehouse in Proprietà file>. L'URL dovrebbe essere similehttps://onelake.dfs.fabric.microsoft.com
a .target.fabricOneLake.names
: nomi dell'area di lavoro e della lakehouse. Usare questo campo oguids
. Non usare entrambi.workspaceName
: nome dell'area di lavoro.lakehouseName
: nome della lakehouse.
apiVersion: mq.iotoperations.azure.com/v1beta1 kind: DataLakeConnector metadata: name: my-datalake-connector namespace: azure-iot-operations spec: protocol: v5 image: repository: mcr.microsoft.com/azureiotoperations/datalake tag: 0.4.0-preview pullPolicy: IfNotPresent instances: 2 logLevel: info databaseFormat: delta target: fabricOneLake: # Example: https://onelake.dfs.fabric.microsoft.com endpoint: <example-endpoint-url> names: workspaceName: <example-workspace-name> lakehouseName: <example-lakehouse-name> ## OR # guids: # workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx # lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx fabricPath: tables authentication: systemAssignedManagedIdentity: audience: https://storage.azure.com/ localBrokerConnection: endpoint: aio-mq-dmqtt-frontend:8883 tls: tlsEnabled: true trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only authentication: kubernetes: {}
Creare una risorsa DataLake Connessione orTopicMap che definisce il mapping tra l'argomento MQTT e la tabella Delta nel data lake Archiviazione. È possibile usare yaml fornito come esempio, ma assicurarsi di modificare i campi seguenti:
dataLakeConnectorRef
: nome della risorsa DataLake Connessione or creata in precedenza.clientId
: identificatore univoco per il client MQTT.mqttSourceTopic
: nome dell'argomento MQTT da cui si vogliono ottenere i dati.table.tableName
: nome della tabella a cui si desidera aggiungere nella lakehouse. La tabella viene creata automaticamente se non esiste.table.schema
: schema della tabella Delta che deve corrispondere al formato e ai campi dei messaggi JSON inviati all'argomento MQTT.
Applicare le risorse DataLake Connessione or e DataLake Connessione orTopicMap al cluster Kubernetes usando
kubectl apply -f datalake-connector.yaml
.Iniziare a inviare messaggi JSON all'argomento MQTT usando il server di pubblicazione MQTT. L'istanza del connettore Data Lake sottoscrive l'argomento e inserisce i messaggi nella tabella Delta.
Usando un browser, verificare che i dati vengano importati nel lakehouse. Nell'area di lavoro Microsoft Fabric selezionare il lakehouse e quindi Tabelle. I dati verranno visualizzati nella tabella.
Tabella non identificata
Se i dati vengono visualizzati nella tabella Non identificata :
La causa potrebbe non essere supportata nel nome della tabella. Il nome della tabella deve essere un nome di contenitore valido Archiviazione di Azure che significa che può contenere qualsiasi lettera inglese, maiuscola o minuscola e barra inferiore_
, con lunghezza massima di 256 caratteri. Non sono consentiti trattini -
o spazi.
Configurare per inviare dati ad Azure Data Lake Archiviazione Gen2 usando il token di firma di accesso condiviso
Configurare un connettore Data Lake per connettersi a un account Azure Data Lake Archiviazione Gen2 (ADLS Gen2) usando un token di firma di accesso condiviso.
Ottenere un token di firma di accesso condiviso per un account Azure Data Lake Archiviazione Gen2 (ADLS Gen2). Ad esempio, usare il portale di Azure per passare all'account di archiviazione. Nel menu in Sicurezza e rete scegliere Firma di accesso condiviso. Usare la tabella seguente per impostare le autorizzazioni necessarie.
Parametro Valore Servizi consentiti BLOB Tipi di risorse consentiti Oggetto, contenitore Autorizzazioni consentite Lettura, scrittura, eliminazione, elenco, creazione Per ottimizzare i privilegi minimi, è anche possibile scegliere di ottenere la firma di accesso condiviso per un singolo contenitore. Per evitare errori di autenticazione, assicurarsi che il contenitore corrisponda al
table.tableName
valore nella configurazione della mappa dell'argomento.Creare un segreto Kubernetes con il token di firma di accesso condiviso. Non includere il punto interrogativo
?
che potrebbe trovarsi all'inizio del token.kubectl create secret generic my-sas \ --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \ -n azure-iot-operations
Creare una risorsa DataLake Connessione or che definisce le impostazioni di configurazione ed endpoint per il connettore. È possibile usare yaml fornito come esempio, ma assicurarsi di modificare i campi seguenti:
endpoint
: endpoint di data lake Archiviazione dell'account di archiviazione ADLSv2 sotto forma dihttps://example.blob.core.windows.net
. In portale di Azure trovare l'endpoint in Archiviazione account > Impostazioni > Endpoint > Data Lake Archiviazione.accessTokenSecretName
: nome del segreto Kubernetes contenente il token di firma di accesso condiviso (my-sas
dall'esempio precedente).
apiVersion: mq.iotoperations.azure.com/v1beta1 kind: DataLakeConnector metadata: name: my-datalake-connector namespace: azure-iot-operations spec: protocol: v5 image: repository: mcr.microsoft.com/azureiotoperations/datalake tag: 0.4.0-preview pullPolicy: IfNotPresent instances: 2 logLevel: "debug" databaseFormat: "delta" target: datalakeStorage: endpoint: "https://example.blob.core.windows.net" authentication: accessTokenSecretName: "my-sas" localBrokerConnection: endpoint: aio-mq-dmqtt-frontend:8883 tls: tlsEnabled: true trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only authentication: kubernetes: {}
Creare una risorsa DataLake Connessione orTopicMap che definisce il mapping tra l'argomento MQTT e la tabella Delta nel data lake Archiviazione. È possibile usare yaml fornito come esempio, ma assicurarsi di modificare i campi seguenti:
dataLakeConnectorRef
: nome della risorsa DataLake Connessione or creata in precedenza.clientId
: identificatore univoco per il client MQTT.mqttSourceTopic
: nome dell'argomento MQTT da cui si vogliono ottenere i dati.table.tableName
: nome del contenitore a cui si vuole aggiungere nel data lake Archiviazione. Se l'ambito del token di firma di accesso condiviso è l'account, il contenitore viene creato automaticamente se mancante.table.schema
: schema della tabella Delta, che deve corrispondere al formato e ai campi dei messaggi JSON inviati all'argomento MQTT.
Applicare le risorse DataLake Connessione or e DataLake Connessione orTopicMap al cluster Kubernetes usando
kubectl apply -f datalake-connector.yaml
.Iniziare a inviare messaggi JSON all'argomento MQTT usando il server di pubblicazione MQTT. L'istanza del connettore Data Lake sottoscrive l'argomento e inserisce i messaggi nella tabella Delta.
Usando portale di Azure, verificare che la tabella Delta sia stata creata. I file sono organizzati in base all'ID client, al nome dell'istanza del connettore, all'argomento MQTT e all'ora. Nei contenitori dell'account >di archiviazione aprire il contenitore specificato in DataLake Connessione orTopicMap. Verificare _delta_log esistano file di _delta_log e i file di _delta_log mostrano il traffico MQTT. Aprire un file di classe per verificare che il payload corrisponda a ciò che è stato inviato e definito nello schema.
Usare l'identità gestita per l'autenticazione ad ADLSv2
Per usare l'identità gestita, specificarla come unico metodo in DataLake Connessione or authentication
. Usare az k8s-extension show
per trovare l'ID entità per l'estensione IoT MQ Arc, quindi assegnare un ruolo all'identità gestita che concede l'autorizzazione per scrivere nell'account di archiviazione, ad esempio Archiviazione Collaboratore dati BLOB. Per altre informazioni, vedere Autorizzare l'accesso ai BLOB usando Microsoft Entra ID.
authentication:
systemAssignedManagedIdentity:
audience: https://my-account.blob.core.windows.net
Configurare per inviare dati ad Azure Esplora dati usando l'identità gestita
Configurare il connettore Data Lake per inviare dati a un endpoint di Azure Esplora dati usando l'identità gestita.
Assicurarsi che i passaggi descritti nei prerequisiti siano soddisfatti, incluso un cluster completo di Azure Esplora dati. L'opzione "cluster gratuito" non funziona.
Dopo aver creato il cluster, creare un database per archiviare i dati.
È possibile creare una tabella per i dati specificati tramite il portale di Azure e creare colonne manualmente oppure è possibile usare KQL nella scheda query. Per esempio:
.create table thermostat ( externalAssetId: string, assetName: string, CurrentTemperature: real, Pressure: real, MqttTopic: string, Timestamp: datetime )
Abilitare l'inserimento in streaming
Abilitare l'inserimento in streaming nella tabella e nel database. Nella scheda query eseguire il comando seguente sostituendo <DATABASE_NAME>
con il nome del database:
.alter database <DATABASE_NAME> policy streamingingestion enable
Aggiungere l'identità gestita al cluster Esplora dati di Azure
Affinché il connettore esegua l'autenticazione in Azure Esplora dati, è necessario aggiungere l'identità gestita al cluster di Azure Esplora dati.
- In portale di Azure passare al cluster Kubernetes connesso ad Arc e selezionare Impostazioni> Estensioni. Nell'elenco di estensioni cercare il nome dell'estensione MQ IoT. Il nome inizia con seguito da
mq-
cinque caratteri casuali. Ad esempio, mq-4jgjs. Il nome dell'estensione MQ IoT corrisponde al nome dell'identità gestita MQ. - Nel database di Azure Esplora dati selezionare Autorizzazioni>Aggiungi>ingestor. Cercare il nome dell'identità gestita MQ e aggiungerlo.
Per altre informazioni sull'aggiunta di autorizzazioni, vedere Gestire le autorizzazioni del cluster Esplora dati di Azure.
A questo momento, è possibile distribuire il connettore e inviare dati ad Azure Esplora dati.
File di distribuzione di esempio
File di distribuzione di esempio per il connettore azure Esplora dati. I commenti che iniziano con TODO
richiedono di sostituire le impostazioni segnaposto con le informazioni.
apiVersion: mq.iotoperations.azure.com/v1beta1
name: my-adx-connector
namespace: azure-iot-operations
spec:
repository: mcr.microsoft.com/azureiotoperations/datalake
tag: 0.4.0-preview
pullPolicy: Always
databaseFormat: adx
target:
# TODO: insert the ADX cluster endpoint
endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
authentication:
systemAssignedManagedIdentity:
audience: https://api.kusto.windows.net
localBrokerConnection:
endpoint: aio-mq-dmqtt-frontend:8883
tls:
tlsEnabled: true
trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
authentication:
kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
name: adx-topicmap
namespace: azure-iot-operations
spec:
mapping:
allowedLatencySecs: 1
messagePayloadType: json
maxMessagesPerBatch: 10
clientId: id
mqttSourceTopic: azure-iot-operations/data/thermostat
qos: 1
table:
# TODO: add DB and table name
tablePath: <DATABASE_NAME>
tableName: <TABLE_NAME>
schema:
- name: externalAssetId
format: utf8
optional: false
mapping: $property.externalAssetId
- name: assetName
format: utf8
optional: false
mapping: DataSetWriterName
- name: CurrentTemperature
format: float32
optional: false
mapping: Payload.temperature.Value
- name: Pressure
format: float32
optional: true
mapping: "Payload.Tag 10.Value"
- name: MqttTopic
format: utf8
optional: false
mapping: $topic
- name: Timestamp
format: timestamp
optional: false
mapping: $received_time
In questo esempio vengono accettati dati dall'argomento azure-iot-operations/data/thermostat
con messaggi in formato JSON, ad esempio:
{
"SequenceNumber": 4697,
"Timestamp": "2024-04-02T22:36:03.1827681Z",
"DataSetWriterName": "thermostat",
"MessageType": "ua-deltaframe",
"Payload": {
"temperature": {
"SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
"Value": 5506
},
"Tag 10": {
"SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
"Value": 5506
}
}
}
DataLake Connessione or
DataLake Connessione or è una risorsa personalizzata Kubernetes che definisce la configurazione e le proprietà di un'istanza del connettore Data Lake. Un connettore Data Lake inserisce dati da argomenti MQTT in tabelle Delta in un account data lake Archiviazione.
Il campo spec di una risorsa DataLake Connessione or contiene i sottocampi seguenti:
protocol
: versione MQTT. Può essere uno div5
ov3
.image
: il campo immagine specifica l'immagine del contenitore del modulo connettore data lake. Include i sottocampi seguenti:repository
: nome del registro contenitori e del repository in cui è archiviata l'immagine.tag
: tag dell'immagine da usare.pullPolicy
: criterio pull per l'immagine. Può essere uno diAlways
,IfNotPresent
oNever
.
instances
: numero di repliche del connettore Data Lake da eseguire.logLevel
: livello di log per il modulo del connettore Data Lake. Può essere uno ditrace
,debug
,info
warn
, ,error
ofatal
.databaseFormat
: formato dei dati da inserire nell'Archiviazione Data Lake. Può essere uno didelta
oparquet
.target
: il campo di destinazione specifica la destinazione dell'inserimento dati. Può esseredatalakeStorage
,fabricOneLake
,adx
olocalStorage
.datalakeStorage
: specifica la configurazione e le proprietà dell'account ADLSv2. Include i sottocampi seguenti:endpoint
: URL dell'endpoint dell'account data lake Archiviazione. Non includere alcuna barra/
finale.authentication
: il campo di autenticazione specifica il tipo e le credenziali per l'accesso all'account di Archiviazione Data Lake. Può trattarsi di uno dei seguenti elementi.accessTokenSecretName
: nome del segreto Kubernetes per l'uso dell'autenticazione con token di accesso condiviso per l'account data lake Archiviazione. Questo campo è obbligatorio se il tipo èaccessToken
.systemAssignedManagedIdentity
: per l'uso dell'identità gestita dal sistema per l'autenticazione. Ha un sottocampoaudience
: stringa sotto forma di per il gruppo di destinatari del token di identità gestita con ambito a livello dihttps://<my-account-name>.blob.core.windows.net
account ohttps://storage.azure.com
per qualsiasi account di archiviazione.
fabricOneLake
: specifica la configurazione e le proprietà di Microsoft Fabric OneLake. Include i sottocampi seguenti:endpoint
: URL dell'endpoint OneLake di Microsoft Fabric. In generehttps://onelake.dfs.fabric.microsoft.com
si tratta dell'endpoint globale di OneLake. Se si usa un endpoint a livello di area, è sotto forma dihttps://<region>-onelake.dfs.fabric.microsoft.com
. Non includere alcuna barra/
finale. Per altre informazioni, vedere Connessione a Microsoft OneLake.names
: specifica i nomi dell'area di lavoro e della lakehouse. Usare questo campo oguids
. Non usare entrambi. Include i sottocampi seguenti:workspaceName
: nome dell'area di lavoro.lakehouseName
: nome della lakehouse.
guids
: specifica i GUID dell'area di lavoro e della lakehouse. Usare questo campo onames
. Non usare entrambi. Include i sottocampi seguenti:workspaceGuid
: GUID dell'area di lavoro.lakehouseGuid
: GUID della lakehouse.
fabricPath
: posizione dei dati nell'area di lavoro Infrastruttura. Può esseretables
ofiles
. Se ètables
, i dati vengono archiviati in Fabric OneLake come tabelle. Se èfiles
, i dati vengono archiviati in Fabric OneLake come file. Se èfiles
, devedatabaseFormat
essereparquet
.authentication
: il campo di autenticazione specifica il tipo e le credenziali per l'accesso a Microsoft Fabric OneLake. Può esseresystemAssignedManagedIdentity
solo per il momento. Ha un sottocampo:systemAssignedManagedIdentity
: per l'uso dell'identità gestita dal sistema per l'autenticazione. Ha un sottocampoaudience
: stringa per il gruppo di destinatari del token di identità gestita e deve esserehttps://storage.azure.com
.
adx
: specifica la configurazione e le proprietà del database Esplora dati di Azure. Include i sottocampi seguenti:endpoint
: URL dell'endpoint del cluster Esplora dati di Azure, ad esempiohttps://<CLUSTER>.<REGION>.kusto.windows.net
. Non includere alcuna barra/
finale.authentication
: il campo di autenticazione specifica il tipo e le credenziali per l'accesso al cluster Esplora dati di Azure. Può esseresystemAssignedManagedIdentity
solo per il momento. Ha un sottocampo:systemAssignedManagedIdentity
: per l'uso dell'identità gestita dal sistema per l'autenticazione. Ha un sottocampoaudience
: stringa per il gruppo di destinatari del token di identità gestito e deve esserehttps://api.kusto.windows.net
.
localStorage
: specifica la configurazione e le proprietà dell'account di archiviazione locale. Include i sottocampi seguenti:volumeName
: nome del volume montato in ognuno dei pod del connettore.
localBrokerConnection
: usato per eseguire l'override della configurazione di connessione predefinita al broker MQTT di IoT MQTT. Vedere Gestire la connessione broker locale.
DataLake Connessione orTopicMap
DataLake Connessione orTopicMap è una risorsa personalizzata Kubernetes che definisce il mapping tra un argomento MQTT e una tabella Delta in un account data lake Archiviazione. Una risorsa DataLake Connessione orTopicMap fa riferimento a una risorsa DataLake Connessione or eseguita nello stesso dispositivo perimetrale e inserisce i dati dall'argomento MQTT nella tabella Delta.
Il campo di specifica di una risorsa DataLake Connessione orTopicMap contiene i sottocampi seguenti:
dataLakeConnectorRef
: nome della risorsa DataLake Connessione or a cui appartiene la mappa di questo argomento.mapping
: il campo di mapping specifica i dettagli e le proprietà dell'argomento MQTT e della tabella Delta. Include i sottocampi seguenti:allowedLatencySecs
: la latenza massima in secondi tra la ricezione di un messaggio dall'argomento MQTT e l'inserimento nella tabella Delta. Campo obbligatorio.clientId
: identificatore univoco per il client MQTT che sottoscrive l'argomento.maxMessagesPerBatch
: numero massimo di messaggi da inserire in un batch nella tabella Delta. A causa di una restrizione temporanea, questo valore deve essere minore di 16 seqos
è impostato su 1. Campo obbligatorio.messagePayloadType
: tipo di payload inviato all'argomento MQTT. Può essere uno dijson
oavro
(non ancora supportato).mqttSourceTopic
: nome degli argomenti MQTT a cui eseguire la sottoscrizione. Supporta la notazione con caratteri jolly dell'argomento MQTT.qos
: qualità del livello di servizio per la sottoscrizione all'argomento MQTT. Può essere uno di 0 o 1.table
: il campo tabella specifica la configurazione e le proprietà della tabella Delta nell'account di Archiviazione Data Lake. Include i sottocampi seguenti:tableName
: nome della tabella Delta a cui creare o aggiungere nell'account data lake Archiviazione. Questo campo è noto anche come nome del contenitore quando viene usato con Azure Data Lake Archiviazione Gen2. Può contenere qualsiasi lettera inglese minuscola e sottobar_
, con lunghezza massima di 256 caratteri. Non sono consentiti trattini-
o spazi.tablePath
: nome del database Esplora dati di Azure quando si usaadx
il connettore di tipo.schema
: schema della tabella Delta, che deve corrispondere al formato e ai campi del payload del messaggio. Si tratta di una matrice di oggetti, ognuno con i sottocampi seguenti:name
: nome della colonna nella tabella Delta.format
: tipo di dati della colonna nella tabella Delta. Può essere uno diboolean
,int8
,int16
int32
, ,int64
,uInt8
, ,uInt16
float16
float32
uInt64
uInt32
date32
timestamp
float64
,binary
o .utf8
I tipi senza segno, ad esempiouInt8
, non sono completamente supportati e vengono considerati come tipi firmati se specificati qui.optional
: valore booleano che indica se la colonna è facoltativa o obbligatoria. Questo campo è facoltativo e il valore predefinito è false.mapping
: espressione di percorso JSON che definisce come estrarre il valore della colonna dal payload del messaggio MQTT. I mapping$client_id
predefiniti ,$topic
,$properties
e$received_time
sono disponibili per l'uso come colonne per arricchire il codice JSON nel corpo del messaggio MQTT. Campo obbligatorio. Usare $properties per le proprietà utente MQTT. Ad esempio, $properties.assetId rappresenta il valore della proprietà assetId del messaggio MQTT.
Ecco un esempio di risorsa DataLake Connessione orTopicMap:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
name: datalake-topicmap
namespace: azure-iot-operations
spec:
dataLakeConnectorRef: my-datalake-connector
mapping:
allowedLatencySecs: 1
messagePayloadType: json
maxMessagesPerBatch: 10
clientId: id
mqttSourceTopic: azure-iot-operations/data/thermostat
qos: 1
table:
tableName: thermostat
schema:
- name: externalAssetId
format: utf8
optional: false
mapping: $property.externalAssetId
- name: assetName
format: utf8
optional: false
mapping: DataSetWriterName
- name: CurrentTemperature
format: float32
optional: false
mapping: Payload.temperature.Value
- name: Pressure
format: float32
optional: true
mapping: "Payload.Tag 10.Value"
- name: Timestamp
format: timestamp
optional: false
mapping: $received_time
JSON stringato come "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}"
non è supportato e fa sì che il connettore generi un errore di valore Null rilevato da un convertitore.
Messaggio di esempio per l'argomento azure-iot-operations/data/thermostat
che funziona con questo schema:
{
"SequenceNumber": 4697,
"Timestamp": "2024-04-02T22:36:03.1827681Z",
"DataSetWriterName": "thermostat",
"MessageType": "ua-deltaframe",
"Payload": {
"temperature": {
"SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
"Value": 5506
},
"Tag 10": {
"SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
"Value": 5506
}
}
}
Che esegue il mapping a:
externalAssetId | assetName | CurrentTemperature | Pressione | mqttTopic | timestamp |
---|---|---|---|---|---|
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx | termostato-de | 5506 | 5506 | Dlc | 2024-04-02T22:36:03.1827681Z |
Importante
Se lo schema dei dati viene aggiornato, ad esempio viene modificato un tipo di dati o viene modificato un nome, la trasformazione dei dati in ingresso potrebbe smettere di funzionare. Se si verifica una modifica dello schema, è necessario modificare il nome della tabella dati.
Delta o parquet
Sono supportati sia i formati delta che parquet.
Gestire la connessione broker locale
Come il bridge MQTT, il connettore Data Lake funge da client per il broker MQTT di IoT MQTT. Se è stata personalizzata la porta del listener o l'autenticazione del broker MQTT di IoT MQTT, eseguire l'override della configurazione della connessione MQTT locale anche per il connettore Data Lake. Per altre informazioni, vedere Connessione broker locale del bridge MQTT.
Contenuto correlato
Pubblicare e sottoscrivere messaggi MQTT con l'anteprima mq di Azure IoT