Odesílání dat z Azure IoT MQ Preview do Data Lake Storage

Důležité

Azure IoT Operations Preview – Služba Azure Arc je aktuálně ve verzi PREVIEW. Tento software ve verzi Preview byste neměli používat v produkčních prostředích.

Právní podmínky, které platí pro funkce Azure, které jsou ve verzi beta, verzi Preview nebo které zatím nejsou veřejně dostupné, najdete v Dodatečných podmínkách použití pro Microsoft Azure verze Preview.

Pomocí konektoru Data Lake můžete odesílat data z zprostředkovatele Azure IoT MQ Preview do datového jezera, jako je Azure Data Lake Storage Gen2 (ADLSv2), Microsoft Fabric OneLake a Azure Data Explorer. Konektor se přihlásí k odběru témat MQTT a ingestuje zprávy do tabulek Delta v účtu Data Lake Storage.

Požadavky

Konfigurace odesílání dat do Microsoft Fabric OneLake pomocí spravované identity

Nakonfigurujte konektor Data Lake pro připojení k Microsoft Fabric OneLake pomocí spravované identity.

  1. Ujistěte se, že jsou splněné kroky v požadavcích, včetně pracovního prostoru Microsoft Fabric a jezerahouse. Výchozí pracovní prostor nelze použít.

  2. Ujistěte se, že je nainstalované a nakonfigurované rozšíření IoT MQ Arc se spravovanou identitou.

  3. Na webu Azure Portal přejděte do clusteru Kubernetes připojeného ke službě Arc a vyberte Nastavení> Extensions. V seznamu rozšíření vyhledejte název rozšíření IoT MQ. Název začíná pěti náhodnými mq- znaky. Například mq-4jgjs.

  4. Získejte ID aplikace přidružené ke spravované identitě rozšíření IoT MQ Arc a poznamenejte si hodnotu GUID. ID aplikace se liší od id objektu nebo objektu zabezpečení. Azure CLI můžete použít vyhledáním ID objektu spravované identity a následným dotazováním ID aplikace instančního objektu přidruženého ke spravované identitě. Příklad:

    OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv)
    az ad sp show --query appId --id $OBJECT_ID --output tsv
    

    Měli byste získat výstup s hodnotou GUID:

    xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    

    Tento identifikátor GUID je ID aplikace, které potřebujete použít v dalším kroku.

  5. V pracovním prostoru Microsoft Fabric použijte možnost Spravovat přístup a pak vyberte + Přidat lidi nebo skupiny.

  6. Vyhledejte rozšíření IoT MQ Arc podle názvu "mq" a nezapomeňte vybrat hodnotu GUID ID aplikace, kterou jste našli v předchozím kroku.

  7. Jako roli vyberte Přispěvatel a pak vyberte Přidat.

  8. Vytvořte prostředek DataLake Připojení or, který definuje nastavení konfigurace a koncového bodu konektoru. Jako příklad můžete použít YAML, ale nezapomeňte změnit následující pole:

    • target.fabricOneLake.endpoint: Koncový bod účtu Microsoft Fabric OneLake. Adresu URL koncového bodu můžete získat z Microsoft Fabric Lakehouse v části Vlastnosti souborů>. Adresa URL by měla vypadat takto https://onelake.dfs.fabric.microsoft.com.
    • target.fabricOneLake.names: Názvy pracovního prostoru a jezera. Použijte buď toto pole, nebo guids. Nepoužívejte obojí.
      • workspaceName: Název pracovního prostoru.
      • lakehouseName: Název jezera.
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: info
      databaseFormat: delta
      target:
        fabricOneLake:
          # Example: https://onelake.dfs.fabric.microsoft.com
          endpoint: <example-endpoint-url>
          names:
            workspaceName: <example-workspace-name>
            lakehouseName: <example-lakehouse-name>
          ## OR
          # guids:
          #   workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          #   lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          fabricPath: tables
          authentication:
            systemAssignedManagedIdentity:
              audience: https://storage.azure.com/
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  9. Vytvořte prostředek DataLake Připojení orTopicMap, který definuje mapování mezi tématem MQTT a tabulkou Delta ve službě Data Lake Storage. Jako příklad můžete použít YAML, ale nezapomeňte změnit následující pole:

    • dataLakeConnectorRef: Název prostředku DataLake Připojení or, který jste vytvořili dříve.
    • clientId: Jedinečný identifikátor klienta MQTT.
    • mqttSourceTopic: Název tématu MQTT, ze kterého chcete data pocházet.
    • table.tableName: Název tabulky, ke které chcete připojit v jezeře. Pokud tabulka neexistuje, vytvoří se automaticky.
    • table.schema: Schéma tabulky Delta, které by mělo odpovídat formátu a polím zpráv JSON, které odesíláte do tématu MQTT.
  10. Použití prostředků DataLake Připojení or a DataLake Připojení orTopicMap na cluster Kubernetes pomocí kubectl apply -f datalake-connector.yaml.

  11. Začněte odesílat zprávy JSON do tématu MQTT pomocí vydavatele MQTT. Instance konektoru Data Lake se přihlásí k odběru tématu a ingestuje zprávy do tabulky Delta.

  12. Pomocí prohlížeče ověřte, že se data naimportují do jezera. V pracovním prostoru Microsoft Fabric vyberte svůj lakehouse a pak Tabulky. Měla by se zobrazit data v tabulce.

Neidentifikovaná tabulka

Pokud se vaše data zobrazují v tabulce Neidentifikovaná :

Příčinou můžou být nepodporované znaky v názvu tabulky. Název tabulky musí být platný název kontejneru služby Azure Storage, který znamená, že může obsahovat libovolné anglické písmeno, velká nebo malá písmena a podbar _s délkou až 256 znaků. Nejsou povoleny - pomlčky ani mezery.

Konfigurace odesílání dat do Azure Data Lake Storage Gen2 pomocí tokenu SAS

Nakonfigurujte konektor Data Lake pro připojení k účtu Azure Data Lake Storage Gen2 (ADLS Gen2) pomocí tokenu sdíleného přístupového podpisu (SAS).

  1. Získejte token SAS pro účet Azure Data Lake Storage Gen2 (ADLS Gen2). Například pomocí webu Azure Portal přejděte ke svému účtu úložiště. V nabídce v části Zabezpečení a sítě zvolte Sdílený přístupový podpis. K nastavení požadovaných oprávnění použijte následující tabulku.

    Parametr Hodnota
    Povolené služby Objekt blob
    Povolené typy prostředků Objekt, kontejner
    Povolená oprávnění Čtení, zápis, odstranění, výpis, vytvoření

    Pokud chcete optimalizovat nejnižší oprávnění, můžete se také rozhodnout získat SAS pro jednotlivé kontejnery. Pokud chcete zabránit chybám ověřování, ujistěte se, že kontejner odpovídá table.tableName hodnotě v konfiguraci mapování tématu.

  2. Vytvořte tajný klíč Kubernetes pomocí tokenu SAS. Nezahrnujte otazník ? , který může být na začátku tokenu.

    kubectl create secret generic my-sas \
    --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \
    -n azure-iot-operations
    
  3. Vytvořte prostředek DataLake Připojení or, který definuje nastavení konfigurace a koncového bodu konektoru. Jako příklad můžete použít YAML, ale nezapomeňte změnit následující pole:

    • endpoint: Koncový bod Data Lake Storage účtu úložiště ADLSv2 ve formě https://example.blob.core.windows.net. Na webu Azure Portal najděte koncový bod v části Účet > úložiště Nastavení > Endpoints > Data Lake Storage.
    • accessTokenSecretName: Název tajného kódu Kubernetes obsahujícího token SAS (my-sas z předchozího příkladu).
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: "debug"
      databaseFormat: "delta"
      target:
        datalakeStorage:
          endpoint: "https://example.blob.core.windows.net"
          authentication:
            accessTokenSecretName: "my-sas"
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  4. Vytvořte prostředek DataLake Připojení orTopicMap, který definuje mapování mezi tématem MQTT a tabulkou Delta ve službě Data Lake Storage. Jako příklad můžete použít YAML, ale nezapomeňte změnit následující pole:

    • dataLakeConnectorRef: Název prostředku DataLake Připojení or, který jste vytvořili dříve.
    • clientId: Jedinečný identifikátor klienta MQTT.
    • mqttSourceTopic: Název tématu MQTT, ze kterého chcete data pocházet.
    • table.tableName: Název kontejneru, ke kterému se chcete připojit ve službě Data Lake Storage. Pokud je token SAS vymezený na účet, kontejner se automaticky vytvoří, pokud chybí.
    • table.schema: Schéma tabulky Delta, které by mělo odpovídat formátu a polím zpráv JSON, které odesíláte do tématu MQTT.
  5. Použijte prostředky DataLake Připojení or a DataLake Připojení orTopicMap na cluster Kubernetes pomocí kubectl apply -f datalake-connector.yaml.

  6. Začněte odesílat zprávy JSON do tématu MQTT pomocí vydavatele MQTT. Instance konektoru Data Lake se přihlásí k odběru tématu a ingestuje zprávy do tabulky Delta.

  7. Pomocí webu Azure Portal ověřte, že je vytvořena tabulka Delta. Soubory jsou uspořádané podle ID klienta, názvu instance konektoru, tématu MQTT a času. V kontejnerech účtu >úložiště otevřete kontejner, který jste zadali v objektu DataLake Připojení orTopicMap. Ověřte , _delta_log existují a soubory parque zobrazují provoz MQTT. Otevřete soubor parque, abyste potvrdili, že datová část odpovídá tomu, co bylo odesláno a definováno ve schématu.

Použití spravované identity pro ověřování v ADLSv2

Pokud chcete použít spravovanou identitu, zadejte ji jako jedinou metodu v části DataLake Připojení or authentication. Slouží az k8s-extension show k vyhledání ID objektu zabezpečení pro rozšíření IoT MQ Arc a přiřazení role spravované identitě, která uděluje oprávnění k zápisu do účtu úložiště, jako je přispěvatel dat objektů blob služby Storage. Další informace najdete v tématu Autorizace přístupu k objektům blob pomocí ID Microsoft Entra.

authentication:
  systemAssignedManagedIdentity:
    audience: https://my-account.blob.core.windows.net

Konfigurace odesílání dat do Azure Data Exploreru pomocí spravované identity

Nakonfigurujte konektor Data Lake tak, aby odesílal data do koncového bodu Azure Data Exploreru pomocí spravované identity.

  1. Ujistěte se, že jsou splněné kroky v požadavcích, včetně úplného clusteru Azure Data Exploreru. Možnost "Volný cluster" nefunguje.

  2. Po vytvoření clusteru vytvořte databázi pro ukládání dat.

  3. Tabulku pro daná data můžete vytvořit na webu Azure Portal a vytvořit sloupce ručně nebo můžete použít KQL na kartě dotazu. Příklad:

    .create table thermostat (
        externalAssetId: string,
        assetName: string,
        CurrentTemperature: real,
        Pressure: real,
        MqttTopic: string,
        Timestamp: datetime
    )
    

Povolení příjmu dat streamování

Povolte příjem dat streamování v tabulce a databázi. Na kartě dotazu spusťte následující příkaz a nahraďte <DATABASE_NAME> název databáze:

.alter database <DATABASE_NAME> policy streamingingestion enable

Přidání spravované identity do clusteru Azure Data Exploreru

Aby se konektor mohl ověřit v Azure Data Exploreru, musíte do clusteru Azure Data Exploreru přidat spravovanou identitu.

  1. Na webu Azure Portal přejděte do clusteru Kubernetes připojeného ke službě Arc a vyberte Nastavení> Extensions. V seznamu rozšíření vyhledejte název rozšíření IoT MQ. Název začíná pěti náhodnými mq- znaky. Například mq-4jgjs. Název rozšíření IoT MQ je stejný jako název spravované identity MQ.
  2. V databázi Azure Data Exploreru vyberte Oprávnění>Přidat>ingestor. Vyhledejte název spravované identity MQ a přidejte ji.

Další informace o přidávání oprávnění najdete v tématu Správa oprávnění clusteru Azure Data Exploreru.

Teď jste připraveni nasadit konektor a odesílat data do Azure Data Exploreru.

Ukázkový soubor nasazení

Příklad souboru nasazení pro konektor Azure Data Exploreru Komentáře, které začínají TODO , vyžadují, abyste nahradili nastavení zástupných symbolů vašimi informacemi.

apiVersion: mq.iotoperations.azure.com/v1beta1
  name: my-adx-connector
  namespace: azure-iot-operations
spec:
    repository: mcr.microsoft.com/azureiotoperations/datalake
    tag: 0.4.0-preview
    pullPolicy: Always
  databaseFormat: adx
  target:
      # TODO: insert the ADX cluster endpoint
      endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
      authentication:
        systemAssignedManagedIdentity:
          audience: https://api.kusto.windows.net
  localBrokerConnection:
    endpoint: aio-mq-dmqtt-frontend:8883
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
    authentication:
      kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: adx-topicmap
  namespace: azure-iot-operations
spec:
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      # TODO: add DB and table name
      tablePath: <DATABASE_NAME>
      tableName: <TABLE_NAME>
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: MqttTopic
        format: utf8
        optional: false
        mapping: $topic
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

Tento příklad přijímá data z azure-iot-operations/data/thermostat tématu se zprávami ve formátu JSON, například následující:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

DataLake Připojení or

DataLake Připojení or je vlastní prostředek Kubernetes, který definuje konfiguraci a vlastnosti instance konektoru Data Lake. Konektor Data Lake ingestuje data z témat MQTT do tabulek Delta v účtu Data Lake Storage.

Pole specifikace zdroje DataLake Připojení or obsahuje následující dílčí pole:

  • protocol: Verze MQTT. Může to být jeden z v5 nebo v3.
  • image: Pole image určuje image kontejneru modulu konektoru Data Lake. Má následující dílčí pole:
    • repository: Název registru kontejneru a úložiště, kde je image uložená.
    • tag: Značka obrázku, který se má použít.
    • pullPolicy: Zásady vyžádání změn pro image. Může to být jeden z Always, IfNotPresentnebo Never.
  • instances: Počet replik konektoru Data Lake, které se mají spustit.
  • logLevel: Úroveň protokolu pro modul konektoru Data Lake. Může to být jeden z trace, , debug, warninfo, , errornebo fatal.
  • databaseFormat: Formát dat, která se mají ingestovat do Data Lake Storage. Může to být jeden z delta nebo parquet.
  • target: Cílové pole určuje cíl příjmu dat. Může to být datalakeStorage, fabricOneLake, adxnebo localStorage.
    • datalakeStorage: Určuje konfiguraci a vlastnosti účtu ADLSv2. Má následující dílčí pole:
      • endpoint: Adresa URL koncového bodu účtu Data Lake Storage. Nezahrnujte žádné koncové lomítko /.
      • authentication: Ověřovací pole určuje typ a přihlašovací údaje pro přístup k účtu Data Lake Storage. Může to být jedna z následujících možností.
        • accessTokenSecretName: Název tajného kódu Kubernetes pro použití ověřování pomocí sdíleného přístupového tokenu pro účet Data Lake Storage. Toto pole je povinné, pokud je accessTokentyp .
        • systemAssignedManagedIdentity: Pro použití systémové spravované identity pro ověřování. Má jedno dílčí pole.
          • audience: Řetězec ve formě https://<my-account-name>.blob.core.windows.net cílové skupiny tokenů spravované identity vymezený na úroveň účtu nebo https://storage.azure.com pro jakýkoli účet úložiště.
    • fabricOneLake: Určuje konfiguraci a vlastnosti Microsoft Fabric OneLake. Má následující dílčí pole:
      • endpoint: Adresa URL koncového bodu Microsoft Fabric OneLake. Obvykle je https://onelake.dfs.fabric.microsoft.com to proto, že se jedná o globální koncový bod OneLake. Pokud používáte regionální koncový bod, je ve formě https://<region>-onelake.dfs.fabric.microsoft.com. Nezahrnujte žádné koncové lomítko /. Další informace najdete v tématu Připojení microsoft OneLake.
      • names: Určuje názvy pracovního prostoru a lakehouse. Použijte buď toto pole, nebo guids. Nepoužívejte obojí. Má následující dílčí pole:
        • workspaceName: Název pracovního prostoru.
        • lakehouseName: Název jezera.
      • guids: Určuje identifikátory GUID pracovního prostoru a lakehouse. Použijte buď toto pole, nebo names. Nepoužívejte obojí. Má následující dílčí pole:
        • workspaceGuid: Identifikátor GUID pracovního prostoru.
        • lakehouseGuid: IDENTIFIKÁTOR GUID jezera.
      • fabricPath: Umístění dat v pracovním prostoru Fabric. Může to být buď tables nebo files. Pokud ano tables, data se ukládají v tabulkách Fabric OneLake. Pokud ano files, data jsou uložená ve službě Fabric OneLake jako soubory. Pokud ano files, databaseFormat musí to být parquet.
      • authentication: Ověřovací pole určuje typ a přihlašovací údaje pro přístup k Microsoft Fabric OneLake. Teď to může být systemAssignedManagedIdentity jenom teď. Má jedno dílčí pole:
      • systemAssignedManagedIdentity: Pro použití systémové spravované identity pro ověřování. Má jedno dílčí pole.
        • audience: Řetězec pro cílovou skupinu tokenů spravované identity a musí být https://storage.azure.com.
    • adx: Určuje konfiguraci a vlastnosti databáze Azure Data Exploreru. Má následující dílčí pole:
      • endpoint: Adresa URL koncového bodu clusteru Azure Data Exploreru, jako je https://<CLUSTER>.<REGION>.kusto.windows.net. Nezahrnujte žádné koncové lomítko /.
      • authentication: Ověřovací pole určuje typ a přihlašovací údaje pro přístup ke clusteru Azure Data Exploreru. Teď to může být systemAssignedManagedIdentity jenom teď. Má jedno dílčí pole:
        • systemAssignedManagedIdentity: Pro použití systémové spravované identity pro ověřování. Má jedno dílčí pole.
          • audience: Řetězec pro cílovou skupinu tokenů spravované identity a měl by být https://api.kusto.windows.net.
    • localStorage: Určuje konfiguraci a vlastnosti místního účtu úložiště. Má následující dílčí pole:
      • volumeName: Název svazku, který je připojený ke každému podu konektoru.
  • localBrokerConnection: Slouží k přepsání výchozí konfigurace připojení ke zprostředkovateli IoT MQ MQTT. Viz Správa připojení místního zprostředkovatele.

DataLake Připojení orTopicMap

DataLake Připojení orTopicMap je vlastní prostředek Kubernetes, který definuje mapování mezi tématem MQTT a tabulkou Delta v účtu Data Lake Storage. Prostředek DataLake Připojení orTopicMap odkazuje na prostředek DataLake Připojení or, který běží na stejném hraničním zařízení a ingestuje data z tématu MQTT do tabulky Delta.

Pole specifikace prostředku DataLake Připojení orTopicMap obsahuje následující dílčí pole:

  • dataLakeConnectorRef: Název prostředku DataLake Připojení or, ke kterému toto téma mapě patří.
  • mapping: Pole mapování určuje podrobnosti a vlastnosti tématu MQTT a tabulky Delta. Má následující dílčí pole:
    • allowedLatencySecs: Maximální latence v sekundách mezi příjmem zprávy z tématu MQTT a ingestováním zprávy do tabulky Delta. Toto pole je povinné.
    • clientId: Jedinečný identifikátor klienta MQTT, který se přihlásí k odběru tématu.
    • maxMessagesPerBatch: Maximální počet zpráv, které se mají ingestovat v jedné dávce do tabulky Delta. Vzhledem k dočasnému omezení musí být tato hodnota menší než 16, pokud qos je nastavená na hodnotu 1. Toto pole je povinné.
    • messagePayloadType: Typ datové části, která se odešle do tématu MQTT. Může to být jedna z json nebo avro (zatím není podporovaná).
    • mqttSourceTopic: Název témat MQTT pro přihlášení k odběru. Podporuje zápis tématu MQTT se zástupnými znaménou znaménou.
    • qos: Kvalita úrovně služeb pro přihlášení k odběru tématu MQTT. Může to být jedna z 0 nebo 1.
    • table: Pole tabulky určuje konfiguraci a vlastnosti tabulky Delta v účtu Data Lake Storage. Má následující dílčí pole:
      • tableName: Název tabulky Delta, ke které se má vytvořit nebo připojit v účtu Data Lake Storage. Toto pole se také označuje jako název kontejneru při použití s Azure Data Lake Storage Gen2. Může obsahovat libovolné malé písmeno v angličtině a podbar _, s délkou až 256 znaků. Nejsou povoleny - pomlčky ani mezery.
      • tablePath: Název databáze Azure Data Exploreru při použití adx konektoru typu.
      • schema: Schéma tabulky Delta, které by mělo odpovídat formátu a polím datové části zprávy. Jedná se o pole objektů s následujícími dílčími poli:
        • name: Název sloupce v tabulce Delta.
        • format: Datový typ sloupce v tabulce Delta. Může to být jeden z boolean, int8, , , int64uInt16uInt64int16uInt8uInt32int32, float16, float32, float64, date32, timestamp, , , binarynebo .utf8 Nepodepsané typy, například uInt8, nejsou plně podporovány a jsou považovány za typy se znaménky, pokud jsou zde zadány.
        • optional: Logická hodnota, která označuje, jestli je sloupec nepovinný nebo povinný. Toto pole je volitelné a výchozí hodnota je false.
        • mapping: Výraz cesty JSON, který definuje, jak extrahovat hodnotu sloupce z datové části zprávy MQTT. Integrovaná mapování $client_id, $topic, $propertya $received_time jsou k dispozici jako sloupce k obohacení JSON v textu zprávy MQTT. Toto pole je povinné. Pro vlastnosti uživatele MQTT použijte $property. Například $property.assetId představuje hodnotu vlastnosti assetId ze zprávy MQTT.

Tady je příklad prostředku DataLake Připojení orTopicMap:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: datalake-topicmap
  namespace: azure-iot-operations
spec:
  dataLakeConnectorRef: my-datalake-connector
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      tableName: thermostat
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

Stringified JSON like "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}" není podporován a způsobí, že konektor vyvolá převodce našel chybu s hodnotou null.

Příklad zprávy pro azure-iot-operations/data/thermostat téma, které funguje s tímto schématem:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

Na které se mapuje:

externalAssetId assetName CurrentTemperature Tlak mqttTopic časové razítko
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx termostat-de 5506 5506 Dlc 2024-04-02T22:36:03.1827681Z

Důležité

Pokud se schéma dat aktualizuje, například změní se datový typ nebo se změní název, může transformace příchozích dat přestat fungovat. Pokud dojde ke změně schématu, musíte změnit název tabulky dat.

Delta nebo parquet

Podporují se formáty delta i parquet.

Správa připojení místního zprostředkovatele

Stejně jako most MQTT funguje konektor Data Lake jako klient pro zprostředkovatele IoT MQ MQTT. Pokud jste přizpůsobili port naslouchacího procesu nebo ověřování zprostředkovatele IoT MQTT MQTT, přepište také konfiguraci místního připojení MQTT pro konektor Data Lake. Další informace najdete v tématu Připojení místního zprostředkovatele MQTT.

Publikování a přihlášení k odběru zpráv MQTT pomocí Azure IoT MQ Preview