Share via


Inviare dati dall'anteprima MQ di Azure IoT a Data Lake Archiviazione

Importante

Anteprima delle operazioni di Azure IoT: abilitata da Azure Arc è attualmente disponibile in ANTEPRIMA. Non è consigliabile usare questo software di anteprima negli ambienti di produzione.

Vedere le condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure per termini legali aggiuntivi che si applicano a funzionalità di Azure in versione beta, in anteprima o in altro modo non ancora disponibili a livello generale.

È possibile usare il connettore Data Lake per inviare dati dal broker di anteprima di Azure IoT MQ a un data lake, ad esempio Azure Data Lake Archiviazione Gen2 (ADLSv2), Microsoft Fabric OneLake e Azure Esplora dati. Il connettore sottoscrive gli argomenti MQTT e inserisce i messaggi nelle tabelle Delta nell'account data lake Archiviazione.

Prerequisiti

Configurare per inviare dati a Microsoft Fabric OneLake usando l'identità gestita

Configurare un connettore Data Lake per connettersi a Microsoft Fabric OneLake usando l'identità gestita.

  1. Assicurarsi che i passaggi descritti nei prerequisiti siano soddisfatti, inclusa un'area di lavoro di Microsoft Fabric e una lakehouse. Non è possibile usare l'area di lavoro predefinita.

  2. Assicurarsi che l'estensione MQ Arc di IoT sia installata e configurata con l'identità gestita.

  3. In portale di Azure passare al cluster Kubernetes connesso ad Arc e selezionare Impostazioni> Estensioni. Nell'elenco delle estensioni cercare il nome dell'estensione MQ IoT. Il nome inizia con seguito da mq- cinque caratteri casuali. Ad esempio, mq-4jgjs.

  4. Ottenere l'ID app associato all'identità gestita dell'estensione MQ Arc di IoT e annotare il valore GUID. L'ID app è diverso dall'oggetto o dall'ID entità. È possibile usare l'interfaccia della riga di comando di Azure individuando l'ID oggetto dell'identità gestita e quindi eseguendo una query sull'ID app dell'entità servizio associata all'identità gestita. Ad esempio:

    OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv)
    az ad sp show --query appId --id $OBJECT_ID --output tsv
    

    Dovrebbe essere visualizzato un output con un valore GUID:

    xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    

    Questo GUID è l'ID app che è necessario usare nel passaggio successivo.

  5. Nell'area di lavoro Di Microsoft Fabric usare Gestisci accesso, quindi selezionare + Aggiungi persone o gruppi.

  6. Cercare l'estensione IoT MQ Arc in base al nome "mq" e assicurarsi di selezionare il valore GUID DELL'ID app trovato nel passaggio precedente.

  7. Selezionare Collaboratore come ruolo, quindi selezionare Aggiungi.

  8. Creare una risorsa DataLake Connessione or che definisce le impostazioni di configurazione ed endpoint per il connettore. È possibile usare yaml fornito come esempio, ma assicurarsi di modificare i campi seguenti:

    • target.fabricOneLake.endpoint: endpoint dell'account OneLake di Microsoft Fabric. È possibile ottenere l'URL dell'endpoint da Microsoft Fabric lakehouse in Proprietà file>. L'URL dovrebbe essere simile https://onelake.dfs.fabric.microsoft.coma .
    • target.fabricOneLake.names: nomi dell'area di lavoro e della lakehouse. Usare questo campo o guids. Non usare entrambi.
      • workspaceName: nome dell'area di lavoro.
      • lakehouseName: nome della lakehouse.
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: info
      databaseFormat: delta
      target:
        fabricOneLake:
          # Example: https://onelake.dfs.fabric.microsoft.com
          endpoint: <example-endpoint-url>
          names:
            workspaceName: <example-workspace-name>
            lakehouseName: <example-lakehouse-name>
          ## OR
          # guids:
          #   workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          #   lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          fabricPath: tables
          authentication:
            systemAssignedManagedIdentity:
              audience: https://storage.azure.com/
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  9. Creare una risorsa DataLake Connessione orTopicMap che definisce il mapping tra l'argomento MQTT e la tabella Delta nel data lake Archiviazione. È possibile usare yaml fornito come esempio, ma assicurarsi di modificare i campi seguenti:

    • dataLakeConnectorRef: nome della risorsa DataLake Connessione or creata in precedenza.
    • clientId: identificatore univoco per il client MQTT.
    • mqttSourceTopic: nome dell'argomento MQTT da cui si vogliono ottenere i dati.
    • table.tableName: nome della tabella a cui si desidera aggiungere nella lakehouse. La tabella viene creata automaticamente se non esiste.
    • table.schema: schema della tabella Delta che deve corrispondere al formato e ai campi dei messaggi JSON inviati all'argomento MQTT.
  10. Applicare le risorse DataLake Connessione or e DataLake Connessione orTopicMap al cluster Kubernetes usando kubectl apply -f datalake-connector.yaml.

  11. Iniziare a inviare messaggi JSON all'argomento MQTT usando il server di pubblicazione MQTT. L'istanza del connettore Data Lake sottoscrive l'argomento e inserisce i messaggi nella tabella Delta.

  12. Usando un browser, verificare che i dati vengano importati nel lakehouse. Nell'area di lavoro Microsoft Fabric selezionare il lakehouse e quindi Tabelle. I dati verranno visualizzati nella tabella.

Tabella non identificata

Se i dati vengono visualizzati nella tabella Non identificata :

La causa potrebbe non essere supportata nel nome della tabella. Il nome della tabella deve essere un nome di contenitore valido Archiviazione di Azure che significa che può contenere qualsiasi lettera inglese, maiuscola o minuscola e barra inferiore_, con lunghezza massima di 256 caratteri. Non sono consentiti trattini - o spazi.

Configurare per inviare dati ad Azure Data Lake Archiviazione Gen2 usando il token di firma di accesso condiviso

Configurare un connettore Data Lake per connettersi a un account Azure Data Lake Archiviazione Gen2 (ADLS Gen2) usando un token di firma di accesso condiviso.

  1. Ottenere un token di firma di accesso condiviso per un account Azure Data Lake Archiviazione Gen2 (ADLS Gen2). Ad esempio, usare il portale di Azure per passare all'account di archiviazione. Nel menu in Sicurezza e rete scegliere Firma di accesso condiviso. Usare la tabella seguente per impostare le autorizzazioni necessarie.

    Parametro Valore
    Servizi consentiti BLOB
    Tipi di risorse consentiti Oggetto, contenitore
    Autorizzazioni consentite Lettura, scrittura, eliminazione, elenco, creazione

    Per ottimizzare i privilegi minimi, è anche possibile scegliere di ottenere la firma di accesso condiviso per un singolo contenitore. Per evitare errori di autenticazione, assicurarsi che il contenitore corrisponda al table.tableName valore nella configurazione della mappa dell'argomento.

  2. Creare un segreto Kubernetes con il token di firma di accesso condiviso. Non includere il punto interrogativo ? che potrebbe trovarsi all'inizio del token.

    kubectl create secret generic my-sas \
    --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \
    -n azure-iot-operations
    
  3. Creare una risorsa DataLake Connessione or che definisce le impostazioni di configurazione ed endpoint per il connettore. È possibile usare yaml fornito come esempio, ma assicurarsi di modificare i campi seguenti:

    • endpoint: endpoint di data lake Archiviazione dell'account di archiviazione ADLSv2 sotto forma di https://example.blob.core.windows.net. In portale di Azure trovare l'endpoint in Archiviazione account > Impostazioni > Endpoint > Data Lake Archiviazione.
    • accessTokenSecretName: nome del segreto Kubernetes contenente il token di firma di accesso condiviso (my-sas dall'esempio precedente).
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: "debug"
      databaseFormat: "delta"
      target:
        datalakeStorage:
          endpoint: "https://example.blob.core.windows.net"
          authentication:
            accessTokenSecretName: "my-sas"
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  4. Creare una risorsa DataLake Connessione orTopicMap che definisce il mapping tra l'argomento MQTT e la tabella Delta nel data lake Archiviazione. È possibile usare yaml fornito come esempio, ma assicurarsi di modificare i campi seguenti:

    • dataLakeConnectorRef: nome della risorsa DataLake Connessione or creata in precedenza.
    • clientId: identificatore univoco per il client MQTT.
    • mqttSourceTopic: nome dell'argomento MQTT da cui si vogliono ottenere i dati.
    • table.tableName: nome del contenitore a cui si vuole aggiungere nel data lake Archiviazione. Se l'ambito del token di firma di accesso condiviso è l'account, il contenitore viene creato automaticamente se mancante.
    • table.schema: schema della tabella Delta, che deve corrispondere al formato e ai campi dei messaggi JSON inviati all'argomento MQTT.
  5. Applicare le risorse DataLake Connessione or e DataLake Connessione orTopicMap al cluster Kubernetes usando kubectl apply -f datalake-connector.yaml.

  6. Iniziare a inviare messaggi JSON all'argomento MQTT usando il server di pubblicazione MQTT. L'istanza del connettore Data Lake sottoscrive l'argomento e inserisce i messaggi nella tabella Delta.

  7. Usando portale di Azure, verificare che la tabella Delta sia stata creata. I file sono organizzati in base all'ID client, al nome dell'istanza del connettore, all'argomento MQTT e all'ora. Nei contenitori dell'account >di archiviazione aprire il contenitore specificato in DataLake Connessione orTopicMap. Verificare _delta_log esistano file di _delta_log e i file di _delta_log mostrano il traffico MQTT. Aprire un file di classe per verificare che il payload corrisponda a ciò che è stato inviato e definito nello schema.

Usare l'identità gestita per l'autenticazione ad ADLSv2

Per usare l'identità gestita, specificarla come unico metodo in DataLake Connessione or authentication. Usare az k8s-extension show per trovare l'ID entità per l'estensione IoT MQ Arc, quindi assegnare un ruolo all'identità gestita che concede l'autorizzazione per scrivere nell'account di archiviazione, ad esempio Archiviazione Collaboratore dati BLOB. Per altre informazioni, vedere Autorizzare l'accesso ai BLOB usando Microsoft Entra ID.

authentication:
  systemAssignedManagedIdentity:
    audience: https://my-account.blob.core.windows.net

Configurare per inviare dati ad Azure Esplora dati usando l'identità gestita

Configurare il connettore Data Lake per inviare dati a un endpoint di Azure Esplora dati usando l'identità gestita.

  1. Assicurarsi che i passaggi descritti nei prerequisiti siano soddisfatti, incluso un cluster completo di Azure Esplora dati. L'opzione "cluster gratuito" non funziona.

  2. Dopo aver creato il cluster, creare un database per archiviare i dati.

  3. È possibile creare una tabella per i dati specificati tramite il portale di Azure e creare colonne manualmente oppure è possibile usare KQL nella scheda query. Per esempio:

    .create table thermostat (
        externalAssetId: string,
        assetName: string,
        CurrentTemperature: real,
        Pressure: real,
        MqttTopic: string,
        Timestamp: datetime
    )
    

Abilitare l'inserimento in streaming

Abilitare l'inserimento in streaming nella tabella e nel database. Nella scheda query eseguire il comando seguente sostituendo <DATABASE_NAME> con il nome del database:

.alter database <DATABASE_NAME> policy streamingingestion enable

Aggiungere l'identità gestita al cluster Esplora dati di Azure

Affinché il connettore esegua l'autenticazione in Azure Esplora dati, è necessario aggiungere l'identità gestita al cluster di Azure Esplora dati.

  1. In portale di Azure passare al cluster Kubernetes connesso ad Arc e selezionare Impostazioni> Estensioni. Nell'elenco di estensioni cercare il nome dell'estensione MQ IoT. Il nome inizia con seguito da mq- cinque caratteri casuali. Ad esempio, mq-4jgjs. Il nome dell'estensione MQ IoT corrisponde al nome dell'identità gestita MQ.
  2. Nel database di Azure Esplora dati selezionare Autorizzazioni>Aggiungi>ingestor. Cercare il nome dell'identità gestita MQ e aggiungerlo.

Per altre informazioni sull'aggiunta di autorizzazioni, vedere Gestire le autorizzazioni del cluster Esplora dati di Azure.

A questo momento, è possibile distribuire il connettore e inviare dati ad Azure Esplora dati.

File di distribuzione di esempio

File di distribuzione di esempio per il connettore azure Esplora dati. I commenti che iniziano con TODO richiedono di sostituire le impostazioni segnaposto con le informazioni.

apiVersion: mq.iotoperations.azure.com/v1beta1
  name: my-adx-connector
  namespace: azure-iot-operations
spec:
    repository: mcr.microsoft.com/azureiotoperations/datalake
    tag: 0.4.0-preview
    pullPolicy: Always
  databaseFormat: adx
  target:
      # TODO: insert the ADX cluster endpoint
      endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
      authentication:
        systemAssignedManagedIdentity:
          audience: https://api.kusto.windows.net
  localBrokerConnection:
    endpoint: aio-mq-dmqtt-frontend:8883
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
    authentication:
      kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: adx-topicmap
  namespace: azure-iot-operations
spec:
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      # TODO: add DB and table name
      tablePath: <DATABASE_NAME>
      tableName: <TABLE_NAME>
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: MqttTopic
        format: utf8
        optional: false
        mapping: $topic
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

In questo esempio vengono accettati dati dall'argomento azure-iot-operations/data/thermostat con messaggi in formato JSON, ad esempio:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

DataLake Connessione or

DataLake Connessione or è una risorsa personalizzata Kubernetes che definisce la configurazione e le proprietà di un'istanza del connettore Data Lake. Un connettore Data Lake inserisce dati da argomenti MQTT in tabelle Delta in un account data lake Archiviazione.

Il campo spec di una risorsa DataLake Connessione or contiene i sottocampi seguenti:

  • protocol: versione MQTT. Può essere uno di v5 o v3.
  • image: il campo immagine specifica l'immagine del contenitore del modulo connettore data lake. Include i sottocampi seguenti:
    • repository: nome del registro contenitori e del repository in cui è archiviata l'immagine.
    • tag: tag dell'immagine da usare.
    • pullPolicy: criterio pull per l'immagine. Può essere uno di Always, IfNotPresento Never.
  • instances: numero di repliche del connettore Data Lake da eseguire.
  • logLevel: livello di log per il modulo del connettore Data Lake. Può essere uno di trace, debug, infowarn, , erroro fatal.
  • databaseFormat: formato dei dati da inserire nell'Archiviazione Data Lake. Può essere uno di delta o parquet.
  • target: il campo di destinazione specifica la destinazione dell'inserimento dati. Può essere datalakeStorage, fabricOneLake, adxo localStorage.
    • datalakeStorage: specifica la configurazione e le proprietà dell'account ADLSv2. Include i sottocampi seguenti:
      • endpoint: URL dell'endpoint dell'account data lake Archiviazione. Non includere alcuna barra /finale.
      • authentication: il campo di autenticazione specifica il tipo e le credenziali per l'accesso all'account di Archiviazione Data Lake. Può trattarsi di uno dei seguenti elementi.
        • accessTokenSecretName: nome del segreto Kubernetes per l'uso dell'autenticazione con token di accesso condiviso per l'account data lake Archiviazione. Questo campo è obbligatorio se il tipo è accessToken.
        • systemAssignedManagedIdentity: per l'uso dell'identità gestita dal sistema per l'autenticazione. Ha un sottocampo
          • audience: stringa sotto forma di per il gruppo di destinatari del token di identità gestita con ambito a livello di https://<my-account-name>.blob.core.windows.net account o https://storage.azure.com per qualsiasi account di archiviazione.
    • fabricOneLake: specifica la configurazione e le proprietà di Microsoft Fabric OneLake. Include i sottocampi seguenti:
      • endpoint: URL dell'endpoint OneLake di Microsoft Fabric. In genere https://onelake.dfs.fabric.microsoft.com si tratta dell'endpoint globale di OneLake. Se si usa un endpoint a livello di area, è sotto forma di https://<region>-onelake.dfs.fabric.microsoft.com. Non includere alcuna barra /finale. Per altre informazioni, vedere Connessione a Microsoft OneLake.
      • names: specifica i nomi dell'area di lavoro e della lakehouse. Usare questo campo o guids. Non usare entrambi. Include i sottocampi seguenti:
        • workspaceName: nome dell'area di lavoro.
        • lakehouseName: nome della lakehouse.
      • guids: specifica i GUID dell'area di lavoro e della lakehouse. Usare questo campo o names. Non usare entrambi. Include i sottocampi seguenti:
        • workspaceGuid: GUID dell'area di lavoro.
        • lakehouseGuid: GUID della lakehouse.
      • fabricPath: posizione dei dati nell'area di lavoro Infrastruttura. Può essere tables o files. Se è tables, i dati vengono archiviati in Fabric OneLake come tabelle. Se è files, i dati vengono archiviati in Fabric OneLake come file. Se è files, deve databaseFormat essere parquet.
      • authentication: il campo di autenticazione specifica il tipo e le credenziali per l'accesso a Microsoft Fabric OneLake. Può essere systemAssignedManagedIdentity solo per il momento. Ha un sottocampo:
      • systemAssignedManagedIdentity: per l'uso dell'identità gestita dal sistema per l'autenticazione. Ha un sottocampo
        • audience: stringa per il gruppo di destinatari del token di identità gestita e deve essere https://storage.azure.com.
    • adx: specifica la configurazione e le proprietà del database Esplora dati di Azure. Include i sottocampi seguenti:
      • endpoint: URL dell'endpoint del cluster Esplora dati di Azure, ad esempio https://<CLUSTER>.<REGION>.kusto.windows.net. Non includere alcuna barra /finale.
      • authentication: il campo di autenticazione specifica il tipo e le credenziali per l'accesso al cluster Esplora dati di Azure. Può essere systemAssignedManagedIdentity solo per il momento. Ha un sottocampo:
        • systemAssignedManagedIdentity: per l'uso dell'identità gestita dal sistema per l'autenticazione. Ha un sottocampo
          • audience: stringa per il gruppo di destinatari del token di identità gestito e deve essere https://api.kusto.windows.net.
    • localStorage: specifica la configurazione e le proprietà dell'account di archiviazione locale. Include i sottocampi seguenti:
      • volumeName: nome del volume montato in ognuno dei pod del connettore.
  • localBrokerConnection: usato per eseguire l'override della configurazione di connessione predefinita al broker MQTT di IoT MQTT. Vedere Gestire la connessione broker locale.

DataLake Connessione orTopicMap

DataLake Connessione orTopicMap è una risorsa personalizzata Kubernetes che definisce il mapping tra un argomento MQTT e una tabella Delta in un account data lake Archiviazione. Una risorsa DataLake Connessione orTopicMap fa riferimento a una risorsa DataLake Connessione or eseguita nello stesso dispositivo perimetrale e inserisce i dati dall'argomento MQTT nella tabella Delta.

Il campo di specifica di una risorsa DataLake Connessione orTopicMap contiene i sottocampi seguenti:

  • dataLakeConnectorRef: nome della risorsa DataLake Connessione or a cui appartiene la mappa di questo argomento.
  • mapping: il campo di mapping specifica i dettagli e le proprietà dell'argomento MQTT e della tabella Delta. Include i sottocampi seguenti:
    • allowedLatencySecs: la latenza massima in secondi tra la ricezione di un messaggio dall'argomento MQTT e l'inserimento nella tabella Delta. Campo obbligatorio.
    • clientId: identificatore univoco per il client MQTT che sottoscrive l'argomento.
    • maxMessagesPerBatch: numero massimo di messaggi da inserire in un batch nella tabella Delta. A causa di una restrizione temporanea, questo valore deve essere minore di 16 se qos è impostato su 1. Campo obbligatorio.
    • messagePayloadType: tipo di payload inviato all'argomento MQTT. Può essere uno di json o avro (non ancora supportato).
    • mqttSourceTopic: nome degli argomenti MQTT a cui eseguire la sottoscrizione. Supporta la notazione con caratteri jolly dell'argomento MQTT.
    • qos: qualità del livello di servizio per la sottoscrizione all'argomento MQTT. Può essere uno di 0 o 1.
    • table: il campo tabella specifica la configurazione e le proprietà della tabella Delta nell'account di Archiviazione Data Lake. Include i sottocampi seguenti:
      • tableName: nome della tabella Delta a cui creare o aggiungere nell'account data lake Archiviazione. Questo campo è noto anche come nome del contenitore quando viene usato con Azure Data Lake Archiviazione Gen2. Può contenere qualsiasi lettera inglese minuscola e sottobar _, con lunghezza massima di 256 caratteri. Non sono consentiti trattini - o spazi.
      • tablePath: nome del database Esplora dati di Azure quando si usa adx il connettore di tipo.
      • schema: schema della tabella Delta, che deve corrispondere al formato e ai campi del payload del messaggio. Si tratta di una matrice di oggetti, ognuno con i sottocampi seguenti:
        • name: nome della colonna nella tabella Delta.
        • format: tipo di dati della colonna nella tabella Delta. Può essere uno di boolean, int8, int16int32, , int64, uInt8, , uInt16float16float32uInt64uInt32date32timestampfloat64, binaryo .utf8 I tipi senza segno, ad esempio uInt8, non sono completamente supportati e vengono considerati come tipi firmati se specificati qui.
        • optional: valore booleano che indica se la colonna è facoltativa o obbligatoria. Questo campo è facoltativo e il valore predefinito è false.
        • mapping: espressione di percorso JSON che definisce come estrarre il valore della colonna dal payload del messaggio MQTT. I mapping $client_idpredefiniti , $topic, $propertiese $received_time sono disponibili per l'uso come colonne per arricchire il codice JSON nel corpo del messaggio MQTT. Campo obbligatorio. Usare $properties per le proprietà utente MQTT. Ad esempio, $properties.assetId rappresenta il valore della proprietà assetId del messaggio MQTT.

Ecco un esempio di risorsa DataLake Connessione orTopicMap:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: datalake-topicmap
  namespace: azure-iot-operations
spec:
  dataLakeConnectorRef: my-datalake-connector
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      tableName: thermostat
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

JSON stringato come "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}" non è supportato e fa sì che il connettore generi un errore di valore Null rilevato da un convertitore.

Messaggio di esempio per l'argomento azure-iot-operations/data/thermostat che funziona con questo schema:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

Che esegue il mapping a:

externalAssetId assetName CurrentTemperature Pressione mqttTopic timestamp
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx termostato-de 5506 5506 Dlc 2024-04-02T22:36:03.1827681Z

Importante

Se lo schema dei dati viene aggiornato, ad esempio viene modificato un tipo di dati o viene modificato un nome, la trasformazione dei dati in ingresso potrebbe smettere di funzionare. Se si verifica una modifica dello schema, è necessario modificare il nome della tabella dati.

Delta o parquet

Sono supportati sia i formati delta che parquet.

Gestire la connessione broker locale

Come il bridge MQTT, il connettore Data Lake funge da client per il broker MQTT di IoT MQTT. Se è stata personalizzata la porta del listener o l'autenticazione del broker MQTT di IoT MQTT, eseguire l'override della configurazione della connessione MQTT locale anche per il connettore Data Lake. Per altre informazioni, vedere Connessione broker locale del bridge MQTT.

Pubblicare e sottoscrivere messaggi MQTT con l'anteprima mq di Azure IoT