Skicka data från förhandsversionen av Azure IoT MQ till Data Lake Storage

Viktigt!

Förhandsversion av Azure IoT Operations – aktiverad av Azure Arc finns för närvarande i FÖRHANDSVERSION. Du bör inte använda den här förhandsgranskningsprogramvaran i produktionsmiljöer.

Juridiska villkor för Azure-funktioner i betaversion, förhandsversion eller som av någon annan anledning inte har gjorts allmänt tillgängliga ännu finns i kompletterande användningsvillkor för Microsoft Azure-förhandsversioner.

Du kan använda data lake connector för att skicka data från Azure IoT MQ Preview Broker till en datasjö, till exempel Azure Data Lake Storage Gen2 (ADLSv2), Microsoft Fabric OneLake och Azure Data Explorer. Anslutningsappen prenumererar på MQTT-ämnen och matar in meddelandena i Delta-tabeller i Data Lake Storage-kontot.

Förutsättningar

Konfigurera för att skicka data till Microsoft Fabric OneLake med hanterad identitet

Konfigurera en data lake-anslutningsapp för att ansluta till Microsoft Fabric OneLake med hjälp av hanterad identitet.

  1. Se till att stegen i förhandskraven uppfylls, inklusive en Microsoft Fabric-arbetsyta och lakehouse. Min standardarbetsyta kan inte användas.

  2. Kontrollera att IoT MQ Arc-tillägget är installerat och konfigurerat med hanterad identitet.

  3. I Azure-portalen går du till det Arc-anslutna Kubernetes-klustret och väljer Inställningar> Extensions. Leta efter ditt IoT MQ-tilläggsnamn i tilläggslistan. Namnet börjar med mq- följt av fem slumpmässiga tecken. Till exempel mq-4jgjs.

  4. Hämta app-ID:t som är associerat med den hanterade identiteten för IoT MQ Arc-tillägget och anteckna GUID-värdet. App-ID :t skiljer sig från objektet eller huvudnamns-ID:t. Du kan använda Azure CLI genom att hitta objekt-ID:t för den hanterade identiteten och sedan köra frågor mot app-ID:t för tjänstens huvudnamn som är associerat med den hanterade identiteten. Till exempel:

    OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv)
    az ad sp show --query appId --id $OBJECT_ID --output tsv
    

    Du bör få utdata med ett GUID-värde:

    xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    

    Detta GUID är det app-ID som du behöver använda i nästa steg.

  5. I Microsoft Fabric-arbetsytan använder du Hantera åtkomst och väljer sedan + Lägg till personer eller grupper.

  6. Sök efter IoT MQ Arc-tillägget med namnet "mq" och välj det guid-värde för app-ID som du hittade i föregående steg.

  7. Välj Deltagare som roll och välj sedan Lägg till.

  8. Skapa en DataLake Anslut eller-resurs som definierar konfigurations- och slutpunktsinställningarna för anslutningsappen. Du kan använda YAML som ett exempel, men se till att ändra följande fält:

    • target.fabricOneLake.endpoint: Slutpunkten för Microsoft Fabric OneLake-kontot. Du kan hämta slutpunkts-URL:en från Microsoft Fabric Lakehouse under Egenskaper för filer>. URL:en bör se ut som https://onelake.dfs.fabric.microsoft.com.
    • target.fabricOneLake.names: Namnen på arbetsytan och sjöhuset. Använd antingen det här fältet eller guids. Använd inte båda.
      • workspaceName: Namnet på arbetsytan.
      • lakehouseName: Namnet på lakehouse.
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: info
      databaseFormat: delta
      target:
        fabricOneLake:
          # Example: https://onelake.dfs.fabric.microsoft.com
          endpoint: <example-endpoint-url>
          names:
            workspaceName: <example-workspace-name>
            lakehouseName: <example-lakehouse-name>
          ## OR
          # guids:
          #   workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          #   lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          fabricPath: tables
          authentication:
            systemAssignedManagedIdentity:
              audience: https://storage.azure.com/
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  9. Skapa en DataLake Anslut orTopicMap-resurs som definierar mappningen mellan MQTT-ämnet och deltatabellen i Data Lake Storage. Du kan använda YAML som ett exempel, men se till att ändra följande fält:

    • dataLakeConnectorRef: Namnet på den DataLake Anslut eller-resurs som du skapade tidigare.
    • clientId: En unik identifierare för din MQTT-klient.
    • mqttSourceTopic: Namnet på det MQTT-ämne som du vill att data ska komma från.
    • table.tableName: Namnet på tabellen som du vill lägga till i lakehouse. Tabellen skapas automatiskt om den inte finns.
    • table.schema: Schemat för deltatabellen som ska matcha formatet och fälten för de JSON-meddelanden som du skickar till MQTT-ämnet.
  10. Använd DataLake Anslut or- och DataLake Anslut orTopicMap-resurser på ditt Kubernetes-kluster med hjälp av kubectl apply -f datalake-connector.yaml.

  11. Börja skicka JSON-meddelanden till MQTT-ämnet med din MQTT-utgivare. Data Lake Connector-instansen prenumererar på ämnet och matar in meddelandena i Delta-tabellen.

  12. Kontrollera att data importeras till lakehouse med hjälp av en webbläsare. I Microsoft Fabric-arbetsytan väljer du ditt lakehouse och sedan Tabeller. Du bör se data i tabellen.

Oidentifierad tabell

Om dina data visas i tabellen Oidentifierad :

Orsaken kan vara tecken som inte stöds i tabellnamnet. Tabellnamnet måste vara ett giltigt Azure Storage-containernamn som innebär att det kan innehålla alla engelska bokstäver, versaler eller gemener och underbar _, med en längd på upp till 256 tecken. Inga bindestreck - eller blankstegstecken tillåts.

Konfigurera för att skicka data till Azure Data Lake Storage Gen2 med hjälp av SAS-token

Konfigurera en data lake-anslutning för att ansluta till ett Azure Data Lake Storage Gen2-konto (ADLS Gen2) med hjälp av en SAS-token (signatur för delad åtkomst).

  1. Hämta en SAS-token för ett Azure Data Lake Storage Gen2-konto (ADLS Gen2). Använd till exempel Azure-portalen för att bläddra till ditt lagringskonto. På menyn under Säkerhet + nätverk väljer du Signatur för delad åtkomst. Använd följande tabell för att ange de behörigheter som krävs.

    Parameter Värde
    Tillåtna tjänster Blob
    Tillåtna resurstyper Objekt, container
    Tillåtna behörigheter Läsa, skriva, ta bort, lista, skapa

    Om du vill optimera för minsta möjliga behörighet kan du också välja att hämta SAS för en enskild container. Om du vill förhindra autentiseringsfel kontrollerar du att containern matchar table.tableName värdet i ämnesmappningskonfigurationen.

  2. Skapa en Kubernetes-hemlighet med SAS-token. Ta inte med frågetecknet ? som kan finnas i början av token.

    kubectl create secret generic my-sas \
    --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \
    -n azure-iot-operations
    
  3. Skapa en DataLake Anslut eller-resurs som definierar konfigurations- och slutpunktsinställningarna för anslutningsappen. Du kan använda YAML som ett exempel, men se till att ändra följande fält:

    • endpoint: Data Lake Storage-slutpunkten för ADLSv2-lagringskontot i form av https://example.blob.core.windows.net. I Azure-portalen hittar du slutpunkten under Lagringskonto > Inställningar > Data Lake Storage för slutpunkter>.
    • accessTokenSecretName: Namnet på Kubernetes-hemligheten som innehåller SAS-token (my-sas från föregående exempel).
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: "debug"
      databaseFormat: "delta"
      target:
        datalakeStorage:
          endpoint: "https://example.blob.core.windows.net"
          authentication:
            accessTokenSecretName: "my-sas"
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  4. Skapa en DataLake Anslut orTopicMap-resurs som definierar mappningen mellan MQTT-ämnet och deltatabellen i Data Lake Storage. Du kan använda YAML som ett exempel, men se till att ändra följande fält:

    • dataLakeConnectorRef: Namnet på den DataLake Anslut eller-resurs som du skapade tidigare.
    • clientId: En unik identifierare för din MQTT-klient.
    • mqttSourceTopic: Namnet på det MQTT-ämne som du vill att data ska komma från.
    • table.tableName: Namnet på containern som du vill lägga till i Data Lake Storage. Om SAS-token är begränsad till kontot skapas containern automatiskt om den saknas.
    • table.schema: Schemat för deltatabellen, som ska matcha formatet och fälten för de JSON-meddelanden som du skickar till MQTT-ämnet.
  5. Använd DataLake Anslut or- och DataLake Anslut orTopicMap-resurserna på ditt Kubernetes-kluster med hjälp av kubectl apply -f datalake-connector.yaml.

  6. Börja skicka JSON-meddelanden till MQTT-ämnet med din MQTT-utgivare. Data Lake Connector-instansen prenumererar på ämnet och matar in meddelandena i Delta-tabellen.

  7. Kontrollera att Delta-tabellen har skapats med Hjälp av Azure-portalen. Filerna ordnas efter klient-ID, anslutningsinstansnamn, MQTT-ämne och tid. Öppna containern som du angav i DataLake Anslut orTopicMap i lagringskontot> Containrar. Kontrollera _delta_log finns och parque-filer visar MQTT-trafik. Öppna en parque-fil för att bekräfta att nyttolasten matchar det som skickades och definierades i schemat.

Använda hanterad identitet för autentisering till ADLSv2

Om du vill använda hanterad identitet anger du den som den enda metoden under DataLake Anslut eller authentication. Använd az k8s-extension show för att hitta huvud-ID:t för IoT MQ Arc-tillägget och tilldela sedan en roll till den hanterade identiteten som ger behörighet att skriva till lagringskontot, till exempel Storage Blob Data Contributor. Mer information finns i Auktorisera åtkomst till blobar med hjälp av Microsoft Entra-ID.

authentication:
  systemAssignedManagedIdentity:
    audience: https://my-account.blob.core.windows.net

Konfigurera för att skicka data till Azure Data Explorer med hjälp av hanterad identitet

Konfigurera data lake connector för att skicka data till en Azure Data Explorer-slutpunkt med hjälp av hanterad identitet.

  1. Se till att stegen i förhandskraven uppfylls, inklusive ett fullständigt Azure Data Explorer-kluster. Alternativet "kostnadsfritt kluster" fungerar inte.

  2. När klustret har skapats skapar du en databas för att lagra dina data.

  3. Du kan skapa en tabell för angivna data via Azure-portalen och skapa kolumner manuellt, eller så kan du använda KQL på frågefliken. Till exempel:

    .create table thermostat (
        externalAssetId: string,
        assetName: string,
        CurrentTemperature: real,
        Pressure: real,
        MqttTopic: string,
        Timestamp: datetime
    )
    

Aktivera inmatning av direktuppspelning

Aktivera strömmande inmatning i tabellen och databasen. Kör följande kommando på frågefliken och ersätt <DATABASE_NAME> med databasnamnet:

.alter database <DATABASE_NAME> policy streamingingestion enable

Lägga till den hanterade identiteten i Azure Data Explorer-klustret

För att anslutningsappen ska kunna autentiseras till Azure Data Explorer måste du lägga till den hanterade identiteten i Azure Data Explorer-klustret.

  1. I Azure-portalen går du till det Arc-anslutna Kubernetes-klustret och väljer Inställningar> Extensions. Leta efter namnet på ditt IoT MQ-tillägg i tilläggslistan. Namnet börjar med mq- följt av fem slumpmässiga tecken. Till exempel mq-4jgjs. IoT MQ-tilläggets namn är samma som namnet på den MQ-hanterade identiteten.
  2. I Azure Data Explorer-databasen väljer du Behörigheter>Lägg till>Ingestor. Sök efter det MQ-hanterade identitetsnamnet och lägg till det.

Mer information om hur du lägger till behörigheter finns i Hantera Azure Data Explorer-klusterbehörigheter.

Nu är du redo att distribuera anslutningsappen och skicka data till Azure Data Explorer.

Exempel på distributionsfil

Exempel på distributionsfil för Azure Data Explorer-anslutningsappen. Kommentarer som börjar med TODO kräver att du ersätter platshållarinställningarna med din information.

apiVersion: mq.iotoperations.azure.com/v1beta1
  name: my-adx-connector
  namespace: azure-iot-operations
spec:
    repository: mcr.microsoft.com/azureiotoperations/datalake
    tag: 0.4.0-preview
    pullPolicy: Always
  databaseFormat: adx
  target:
      # TODO: insert the ADX cluster endpoint
      endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
      authentication:
        systemAssignedManagedIdentity:
          audience: https://api.kusto.windows.net
  localBrokerConnection:
    endpoint: aio-mq-dmqtt-frontend:8883
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
    authentication:
      kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: adx-topicmap
  namespace: azure-iot-operations
spec:
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      # TODO: add DB and table name
      tablePath: <DATABASE_NAME>
      tableName: <TABLE_NAME>
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: MqttTopic
        format: utf8
        optional: false
        mapping: $topic
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

I det här exemplet accepteras data från ämnet azure-iot-operations/data/thermostat med meddelanden i JSON-format, till exempel följande:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

DataLake Anslut or

En DataLake Anslut or är en anpassad Kubernetes-resurs som definierar konfigurationen och egenskaperna för en data lake connector-instans. En data lake connector matar in data från MQTT-ämnen till Delta-tabeller i ett Data Lake Storage-konto.

Specifikationsfältet för en DataLake Anslut eller-resurs innehåller följande underfält:

  • protocol: MQTT-versionen. Det kan vara en av v5 eller v3.
  • image: Avbildningsfältet anger containeravbildningen av data lake connector-modulen. Den har följande underfält:
    • repository: Namnet på containerregistret och lagringsplatsen där avbildningen lagras.
    • tag: Taggen för avbildningen som ska användas.
    • pullPolicy: Pull-principen för avbildningen. Det kan vara en av Always, IfNotPresenteller Never.
  • instances: Antalet repliker av datasjöanslutningsappen som ska köras.
  • logLevel: Loggnivån för data lake connector-modulen. Det kan vara en av trace, debug, info, warn, erroreller fatal.
  • databaseFormat: Formatet för data som ska matas in i Data Lake Storage. Det kan vara en av delta eller parquet.
  • target: Målfältet anger målet för datainmatningen. Det kan vara datalakeStorage, fabricOneLake, adxeller localStorage.
    • datalakeStorage: Anger konfigurationen och egenskaperna för ADLSv2-kontot. Den har följande underfält:
      • endpoint: URL:en för Data Lake Storage-kontots slutpunkt. Ta inte med några avslutande snedstreck /.
      • authentication: Autentiseringsfältet anger typ och autentiseringsuppgifter för åtkomst till Data Lake Storage-kontot. Det kan vara något av följande.
        • accessTokenSecretName: Namnet på Kubernetes-hemligheten för att använda tokenautentisering med delad åtkomst för Data Lake Storage-kontot. Det här fältet krävs om typen är accessToken.
        • systemAssignedManagedIdentity: För att använda systemhanterad identitet för autentisering. Den har ett underfält
          • audience: En sträng i form av https://<my-account-name>.blob.core.windows.net för den hanterade identitetstokens målgrupp som är begränsad till kontonivå eller https://storage.azure.com för ett lagringskonto.
    • fabricOneLake: Anger konfigurationen och egenskaperna för Microsoft Fabric OneLake. Den har följande underfält:
      • endpoint: URL:en för Microsoft Fabric OneLake-slutpunkten. Det beror vanligtvis https://onelake.dfs.fabric.microsoft.com på att det är den globala OneLake-slutpunkten. Om du använder en regional slutpunkt är den i form av https://<region>-onelake.dfs.fabric.microsoft.com. Ta inte med några avslutande snedstreck /. Mer information finns i Anslut till Microsoft OneLake.
      • names: Anger namnen på arbetsytan och lakehouse. Använd antingen det här fältet eller guids. Använd inte båda. Den har följande underfält:
        • workspaceName: Namnet på arbetsytan.
        • lakehouseName: Namnet på lakehouse.
      • guids: Anger GUID för arbetsytan och lakehouse. Använd antingen det här fältet eller names. Använd inte båda. Den har följande underfält:
        • workspaceGuid: GUID för arbetsytan.
        • lakehouseGuid: GUID av lakehouseen.
      • fabricPath: Platsen för data i arbetsytan Infrastruktur. Det kan vara antingen tables eller files. Om det är tableslagras data i Fabric OneLake som tabeller. Om det är fileslagras data i Fabric OneLake som filer. Om det är filesmåste parquetvara databaseFormat .
      • authentication: Autentiseringsfältet anger typ och autentiseringsuppgifter för åtkomst till Microsoft Fabric OneLake. Det kan bara vara systemAssignedManagedIdentity för tillfället. Den har ett underfält:
      • systemAssignedManagedIdentity: För att använda systemhanterad identitet för autentisering. Den har ett underfält
        • audience: En sträng för den hanterade identitetstokens målgruppen och den måste vara https://storage.azure.com.
    • adx: Anger konfigurationen och egenskaperna för Azure Data Explorer-databasen. Den har följande underfält:
      • endpoint: URL:en för Azure Data Explorer-klusterslutpunkten, till exempel https://<CLUSTER>.<REGION>.kusto.windows.net. Ta inte med några avslutande snedstreck /.
      • authentication: Autentiseringsfältet anger typ och autentiseringsuppgifter för åtkomst till Azure Data Explorer-klustret. Det kan bara vara systemAssignedManagedIdentity för tillfället. Den har ett underfält:
        • systemAssignedManagedIdentity: För att använda systemhanterad identitet för autentisering. Den har ett underfält
          • audience: En sträng för den hanterade identitetstokens målgruppen och den ska vara https://api.kusto.windows.net.
    • localStorage: Anger konfigurationen och egenskaperna för det lokala lagringskontot. Den har följande underfält:
      • volumeName: Namnet på volymen som monteras i var och en av anslutningspoddarna.
  • localBrokerConnection: Används för att åsidosätta standardanslutningskonfigurationen till IoT MQ MQTT Broker. Se Hantera lokal asynkron koordinatoranslutning.

DataLake Anslut orTopicMap

En DataLake Anslut orTopicMap är en anpassad Kubernetes-resurs som definierar mappningen mellan ett MQTT-ämne och en Delta-tabell i ett Data Lake Storage-konto. En DataLake Anslut orTopicMap-resurs refererar till en DataLake Anslut eller-resurs som körs på samma gränsenhet och matar in data från MQTT-ämnet i Delta-tabellen.

Specifikationsfältet för en DataLake Anslut orTopicMap-resurs innehåller följande underfält:

  • dataLakeConnectorRef: Namnet på den DataLake Anslut eller-resurs som den här ämneskartan tillhör.
  • mapping: Mappningsfältet anger information och egenskaper för MQTT-ämnet och Delta-tabellen. Den har följande underfält:
    • allowedLatencySecs: Den maximala svarstiden i sekunder mellan att ta emot ett meddelande från MQTT-ämnet och mata in det i Delta-tabellen. Fältet är obligatoriskt.
    • clientId: En unik identifierare för MQTT-klienten som prenumererar på ämnet.
    • maxMessagesPerBatch: Det maximala antalet meddelanden som ska matas in i en batch i Delta-tabellen. På grund av en tillfällig begränsning måste det här värdet vara mindre än 16 om qos det är inställt på 1. Fältet är obligatoriskt.
    • messagePayloadType: Den typ av nyttolast som skickas till MQTT-ämnet. Det kan vara en av json eller avro (stöds inte ännu).
    • mqttSourceTopic: Namnet på de MQTT-ämnen som du vill prenumerera på. Stöder MQTT-ämnes jokertecken notation.
    • qos: Kvaliteten på tjänstnivån för att prenumerera på MQTT-ämnet. Det kan vara en av 0 eller 1.
    • table: Tabellfältet anger konfigurationen och egenskaperna för Delta-tabellen i Data Lake Storage-kontot. Den har följande underfält:
      • tableName: Namnet på deltatabellen som du vill skapa eller lägga till i i Data Lake Storage-kontot. Det här fältet kallas även containernamnet när det används med Azure Data Lake Storage Gen2. Den kan innehålla valfri engelsk gemener och underbar _, med en längd på upp till 256 tecken. Inga bindestreck - eller blankstegstecken tillåts.
      • tablePath: Namnet på Azure Data Explorer-databasen när du använder adx typanslutning.
      • schema: Schemat för Delta-tabellen, som ska matcha formatet och fälten för meddelandets nyttolast. Det är en matris med objekt, var och en med följande underfält:
        • name: Namnet på kolumnen i deltatabellen.
        • format: Datatypen för kolumnen i deltatabellen. Det kan vara en av boolean, int8, int16, int32, int64, uInt8, uInt16, uInt32, , uInt64, float16, float32, float64, date32, timestamp, binaryeller utf8. Osignerade typer, till exempel uInt8, stöds inte fullt ut och behandlas som signerade typer om de anges här.
        • optional: Ett booleskt värde som anger om kolumnen är valfri eller obligatorisk. Det här fältet är valfritt och är som standard falskt.
        • mapping: JSON-sökvägsuttryck som definierar hur du extraherar värdet för kolumnen från MQTT-meddelandenyttolasten. Inbyggda mappningar $client_id, $topic, $propertiesoch $received_time är tillgängliga att använda som kolumner för att utöka JSON i MQTT-meddelandetexten. Fältet är obligatoriskt. Använd $properties för MQTT-användaregenskaper. Till exempel representerar $properties.assetId värdet för egenskapen assetId från MQTT-meddelandet.

Här är ett exempel på en DataLake Anslut orTopicMap-resurs:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: datalake-topicmap
  namespace: azure-iot-operations
spec:
  dataLakeConnectorRef: my-datalake-connector
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      tableName: thermostat
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

Stringified JSON like "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}" stöds inte och gör att anslutningsappen genererar ett null-värdefel för en konverterare.

Ett exempelmeddelande för ämnet azure-iot-operations/data/thermostat som fungerar med det här schemat:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

Som mappar till:

externalAssetId assetName CurrentTemperature Lufttryck mqttTopic timestamp
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx thermostat-de 5506 5506 Dlc 2024-04-02T22:36:03.1827681Z

Viktigt!

Om dataschemat uppdateras, till exempel om en datatyp ändras eller ett namn ändras, kan omvandlingen av inkommande data sluta fungera. Du måste ändra datatabellens namn om en schemaändring sker.

Delta eller parquet

Både delta- och parquet-format stöds.

Hantera lokal asynkron koordinatoranslutning

Precis som MQTT-bryggan fungerar Data Lake Connector som en klient till IoT MQ MQTT-koordinatorn. Om du har anpassat lyssnarporten eller autentiseringen för din IoT MQ MQTT-koordinator åsidosätter du även den lokala MQTT-anslutningskonfigurationen för Data Lake Connector. Mer information finns i MQTT bridge local broker connection (MQTT bridge local broker connection).

Publicera och prenumerera på MQTT-meddelanden med azure IoT MQ Preview