Udostępnij za pośrednictwem


Wysyłanie danych z usługi Azure IoT MQ (wersja zapoznawcza) do usługi Data Lake Storage

Ważne

Usługa Azure IoT Operations Preview — włączona przez usługę Azure Arc jest obecnie dostępna w wersji zapoznawczej. Nie należy używać tego oprogramowania w wersji zapoznawczej w środowiskach produkcyjnych.

Zobacz Dodatkowe warunki użytkowania wersji zapoznawczych platformy Microsoft Azure, aby zapoznać się z postanowieniami prawnymi dotyczącymi funkcji platformy Azure, które są w wersji beta lub wersji zapoznawczej albo w inny sposób nie zostały jeszcze wydane jako ogólnie dostępne.

Łącznik data lake umożliwia wysyłanie danych z brokera usługi Azure IoT MQ w wersji zapoznawczej do magazynu typu data lake, takiego jak Azure Data Lake Storage Gen2 (ADLSv2), Microsoft Fabric OneLake i Azure Data Explorer. Łącznik subskrybuje tematy MQTT i pozyskuje komunikaty do tabel delty na koncie usługi Data Lake Storage.

Wymagania wstępne

Konfigurowanie wysyłania danych do usługi Microsoft Fabric OneLake przy użyciu tożsamości zarządzanej

Skonfiguruj łącznik usługi Data Lake w celu nawiązania połączenia z usługą Microsoft Fabric OneLake przy użyciu tożsamości zarządzanej.

  1. Upewnij się, że zostały spełnione kroki opisane w wymaganiach wstępnych, w tym obszar roboczy usługi Microsoft Fabric i usługa Lakehouse. Nie można użyć domyślnego obszaru roboczego .

  2. Upewnij się, że rozszerzenie usługi IoT MQ Arc jest zainstalowane i skonfigurowane przy użyciu tożsamości zarządzanej.

  3. W witrynie Azure Portal przejdź do klastra Kubernetes połączonego z usługą Arc i wybierz pozycję Ustawienia> Extensions. Na liście rozszerzeń wyszukaj nazwę rozszerzenia IoT MQ. Nazwa zaczyna się od mq- pięciu losowych znaków. Na przykład mq-4jgjs.

  4. Pobierz identyfikator aplikacji skojarzony z tożsamością zarządzaną rozszerzenia IoT MQ Arc i zanotuj wartość identyfikatora GUID. Identyfikator aplikacji różni się od obiektu lub identyfikatora podmiotu zabezpieczeń. Interfejs wiersza polecenia platformy Azure można użyć, wyszukując identyfikator obiektu tożsamości zarządzanej, a następnie wysyłając zapytanie do identyfikatora aplikacji jednostki usługi skojarzonej z tożsamością zarządzaną. Na przykład:

    OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv)
    az ad sp show --query appId --id $OBJECT_ID --output tsv
    

    Powinny zostać wyświetlone dane wyjściowe z wartością identyfikatora GUID:

    xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    

    Ten identyfikator GUID to identyfikator aplikacji, który należy użyć w następnym kroku.

  5. W obszarze roboczym usługi Microsoft Fabric użyj pozycji Zarządzaj dostępem, a następnie wybierz pozycję + Dodaj osoby lub grupy.

  6. Wyszukaj rozszerzenie IoT MQ Arc o nazwie "mq" i upewnij się, że wybrano wartość identyfikatora GUID identyfikatora aplikacji znalezioną w poprzednim kroku.

  7. Wybierz pozycję Współautor jako rolę, a następnie wybierz pozycję Dodaj.

  8. Utwórz zasób DataLake Połączenie or, który definiuje ustawienia konfiguracji i punktu końcowego dla łącznika. Możesz użyć podanego przykładu kodu YAML, ale pamiętaj, aby zmienić następujące pola:

    • target.fabricOneLake.endpoint: punkt końcowy konta onelake usługi Microsoft Fabric. Adres URL punktu końcowego można uzyskać z usługi Microsoft Fabric Lakehouse w obszarze Właściwości plików>. Adres URL powinien wyglądać następująco: https://onelake.dfs.fabric.microsoft.com.
    • target.fabricOneLake.names: nazwy obszaru roboczego i lakehouse. Użyj tego pola lub guids. Nie używaj obu tych elementów.
      • workspaceName: nazwa obszaru roboczego.
      • lakehouseName: nazwa jeziora.
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: info
      databaseFormat: delta
      target:
        fabricOneLake:
          # Example: https://onelake.dfs.fabric.microsoft.com
          endpoint: <example-endpoint-url>
          names:
            workspaceName: <example-workspace-name>
            lakehouseName: <example-lakehouse-name>
          ## OR
          # guids:
          #   workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          #   lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          fabricPath: tables
          authentication:
            systemAssignedManagedIdentity:
              audience: https://storage.azure.com/
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  9. Utwórz zasób DataLake Połączenie orTopicMap, który definiuje mapowanie między tematem MQTT a tabelą delty w usłudze Data Lake Storage. Możesz użyć podanego przykładu kodu YAML, ale pamiętaj, aby zmienić następujące pola:

    • dataLakeConnectorRef: nazwa utworzonego wcześniej zasobu DataLake Połączenie or.
    • clientId: unikatowy identyfikator klienta MQTT.
    • mqttSourceTopic: nazwa tematu MQTT, z którego mają pochodzić dane.
    • table.tableName: nazwa tabeli, do której chcesz dołączyć element w lakehouse. Tabela jest tworzona automatycznie, jeśli nie istnieje.
    • table.schema: schemat tabeli delta, która powinna być zgodna z formatem i polami komunikatów JSON wysyłanych do tematu MQTT.
  10. Zastosuj zasoby DataLake Połączenie or i DataLake Połączenie orTopicMap do klastra Kubernetes przy użyciu polecenia kubectl apply -f datalake-connector.yaml.

  11. Zacznij wysyłać komunikaty JSON do tematu MQTT przy użyciu wydawcy MQTT. Wystąpienie łącznika usługi Data Lake subskrybuje temat i pozyskuje komunikaty do tabeli delty.

  12. Za pomocą przeglądarki sprawdź, czy dane są importowane do magazynu lakehouse. W obszarze roboczym usługi Microsoft Fabric wybierz swoją usługę Lakehouse, a następnie pozycję Tabele. Dane powinny być widoczne w tabeli.

Niezidentyfikowana tabela

Jeśli dane są wyświetlane w tabeli Niezidentyfikowane :

Przyczyną mogą być nieobsługiwane znaki w nazwie tabeli. Nazwa tabeli musi być prawidłową nazwą kontenera usługi Azure Storage, co oznacza, że może zawierać dowolną cyfrę angielską, górną lub małą literę oraz pasek underbar _, o długości do 256 znaków. Nie są dozwolone żadne kreski - ani znaki spacji.

Konfigurowanie wysyłania danych do usługi Azure Data Lake Storage Gen2 przy użyciu tokenu SAS

Skonfiguruj łącznik usługi Data Lake w celu nawiązania połączenia z kontem usługi Azure Data Lake Storage Gen2 (ADLS Gen2) przy użyciu tokenu sygnatury dostępu współdzielonego (SAS).

  1. Uzyskaj token SAS dla konta usługi Azure Data Lake Storage Gen2 (ADLS Gen2). Na przykład użyj witryny Azure Portal, aby przejść do konta magazynu. W menu w obszarze Zabezpieczenia i sieć wybierz pozycję Sygnatura dostępu współdzielonego. Użyj poniższej tabeli, aby ustawić wymagane uprawnienia.

    Parametr Wartość
    Dozwolone usługi Obiekt blob
    Dozwolone typy zasobów Obiekt, kontener
    Dozwolone uprawnienia Odczyt, zapis, usuwanie, lista, tworzenie

    Aby zoptymalizować pod kątem najniższych uprawnień, możesz również pobrać sygnaturę dostępu współdzielonego dla pojedynczego kontenera. Aby zapobiec błędom uwierzytelniania, upewnij się, że kontener jest zgodny z wartością table.tableName w konfiguracji mapy tematu.

  2. Utwórz wpis tajny kubernetes z tokenem SAS. Nie dołączaj znaku ? zapytania, który może znajdować się na początku tokenu.

    kubectl create secret generic my-sas \
    --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \
    -n azure-iot-operations
    
  3. Utwórz zasób DataLake Połączenie or, który definiuje ustawienia konfiguracji i punktu końcowego dla łącznika. Możesz użyć podanego przykładu kodu YAML, ale pamiętaj, aby zmienić następujące pola:

    • endpoint: punkt końcowy usługi Data Lake Storage konta magazynu ADLSv2 w postaci https://example.blob.core.windows.net. W witrynie Azure Portal znajdź punkt końcowy w obszarze Konto > magazynu Ustawienia > Endpoints > Data Lake Storage.
    • accessTokenSecretName: nazwa wpisu tajnego kubernetes zawierającego token SAS (my-sas z poprzedniego przykładu).
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: "debug"
      databaseFormat: "delta"
      target:
        datalakeStorage:
          endpoint: "https://example.blob.core.windows.net"
          authentication:
            accessTokenSecretName: "my-sas"
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  4. Utwórz zasób DataLake Połączenie orTopicMap, który definiuje mapowanie między tematem MQTT a tabelą delty w usłudze Data Lake Storage. Możesz użyć podanego przykładu kodu YAML, ale pamiętaj, aby zmienić następujące pola:

    • dataLakeConnectorRef: nazwa utworzonego wcześniej zasobu DataLake Połączenie or.
    • clientId: unikatowy identyfikator klienta MQTT.
    • mqttSourceTopic: nazwa tematu MQTT, z którego mają pochodzić dane.
    • table.tableName: nazwa kontenera, do którego chcesz dołączyć usługę Data Lake Storage. Jeśli token SYGNATURy dostępu współdzielonego jest w zakresie konta, kontener zostanie utworzony automatycznie, jeśli nie ma go.
    • table.schema: schemat tabeli delta, który powinien być zgodny z formatem i polami komunikatów JSON wysyłanych do tematu MQTT.
  5. Zastosuj zasoby DataLake Połączenie or i DataLake Połączenie orTopicMap do klastra Kubernetes przy użyciu polecenia kubectl apply -f datalake-connector.yaml.

  6. Zacznij wysyłać komunikaty JSON do tematu MQTT przy użyciu wydawcy MQTT. Wystąpienie łącznika usługi Data Lake subskrybuje temat i pozyskuje komunikaty do tabeli delty.

  7. W witrynie Azure Portal sprawdź, czy tabela delta została utworzona. Pliki są uporządkowane według identyfikatora klienta, nazwy wystąpienia łącznika, tematu MQTT i czasu. W kontenerach konta >magazynu otwórz kontener określony w obiekcie DataLake Połączenie orTopicMap. Sprawdź, czy _delta_log istnieje, a pliki parque pokazują ruch MQTT. Otwórz plik parque, aby potwierdzić, że ładunek jest zgodny z tym, co zostało wysłane i zdefiniowane w schemacie.

Używanie tożsamości zarządzanej do uwierzytelniania w usłudze ADLSv2

Aby użyć tożsamości zarządzanej, określ ją jako jedyną metodę w obszarze DataLake Połączenie or authentication. Użyj az k8s-extension show polecenia , aby znaleźć identyfikator podmiotu zabezpieczeń rozszerzenia usługi IoT MQ Arc, a następnie przypisać rolę do tożsamości zarządzanej, która udziela uprawnień do zapisu na koncie magazynu, takiego jak Współautor danych obiektu blob usługi Storage. Aby dowiedzieć się więcej, zobacz Autoryzowanie dostępu do obiektów blob przy użyciu identyfikatora Entra firmy Microsoft.

authentication:
  systemAssignedManagedIdentity:
    audience: https://my-account.blob.core.windows.net

Konfigurowanie wysyłania danych do usługi Azure Data Explorer przy użyciu tożsamości zarządzanej

Skonfiguruj łącznik usługi Data Lake do wysyłania danych do punktu końcowego usługi Azure Data Explorer przy użyciu tożsamości zarządzanej.

  1. Upewnij się, że zostały spełnione kroki opisane w wymaganiach wstępnych, w tym pełny klaster usługi Azure Data Explorer. Opcja "bezpłatny klaster" nie działa.

  2. Po utworzeniu klastra utwórz bazę danych do przechowywania danych.

  3. Tabelę dla danych można utworzyć za pośrednictwem witryny Azure Portal i ręcznie utworzyć kolumny lub użyć języka KQL na karcie zapytania. Na przykład:

    .create table thermostat (
        externalAssetId: string,
        assetName: string,
        CurrentTemperature: real,
        Pressure: real,
        MqttTopic: string,
        Timestamp: datetime
    )
    

Włączanie pozyskiwania danych przesyłanych strumieniowo

Włącz pozyskiwanie strumieniowe w tabeli i bazie danych. Na karcie zapytania uruchom następujące polecenie, zastępując <DATABASE_NAME> nazwę bazy danych:

.alter database <DATABASE_NAME> policy streamingingestion enable

Dodawanie tożsamości zarządzanej do klastra usługi Azure Data Explorer

Aby łącznik był uwierzytelniany w usłudze Azure Data Explorer, należy dodać tożsamość zarządzaną do klastra usługi Azure Data Explorer.

  1. W witrynie Azure Portal przejdź do klastra Kubernetes połączonego z usługą Arc i wybierz pozycję Ustawienia> Extensions. Na liście rozszerzeń wyszukaj nazwę rozszerzenia IoT MQ. Nazwa zaczyna się od mq- pięciu losowych znaków. Na przykład mq-4jgjs. Nazwa rozszerzenia IoT MQ jest taka sama jak nazwa tożsamości zarządzanej MQ.
  2. W bazie danych usługi Azure Data Explorer wybierz pozycję Uprawnienia>Dodaj>ingestor. Wyszukaj nazwę tożsamości zarządzanej MQ i dodaj ją.

Aby uzyskać więcej informacji na temat dodawania uprawnień, zobacz Zarządzanie uprawnieniami klastra usługi Azure Data Explorer.

Teraz możesz przystąpić do wdrażania łącznika i wysyłania danych do usługi Azure Data Explorer.

Przykładowy plik wdrożenia

Przykładowy plik wdrożenia łącznika usługi Azure Data Explorer. Komentarze, które zaczynają się od TODO , wymagają zastąpienia ustawień symboli zastępczych informacjami.

apiVersion: mq.iotoperations.azure.com/v1beta1
  name: my-adx-connector
  namespace: azure-iot-operations
spec:
    repository: mcr.microsoft.com/azureiotoperations/datalake
    tag: 0.4.0-preview
    pullPolicy: Always
  databaseFormat: adx
  target:
      # TODO: insert the ADX cluster endpoint
      endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
      authentication:
        systemAssignedManagedIdentity:
          audience: https://api.kusto.windows.net
  localBrokerConnection:
    endpoint: aio-mq-dmqtt-frontend:8883
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
    authentication:
      kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: adx-topicmap
  namespace: azure-iot-operations
spec:
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      # TODO: add DB and table name
      tablePath: <DATABASE_NAME>
      tableName: <TABLE_NAME>
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: MqttTopic
        format: utf8
        optional: false
        mapping: $topic
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

W tym przykładzie dane z tematu azure-iot-operations/data/thermostat są akceptowane z komunikatami w formacie JSON, takimi jak:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

DataLake Połączenie or

Obiekt DataLake Połączenie or to niestandardowy zasób kubernetes, który definiuje konfigurację i właściwości wystąpienia łącznika usługi Data Lake. Łącznik usługi Data Lake pozyskuje dane z tematów MQTT do tabel delty na koncie usługi Data Lake Storage.

Pole specyfikacji zasobu DataLake Połączenie or zawiera następujące pola podrzędne:

  • protocol: wersja MQTT. Może to być jeden z v5 lub v3.
  • image: Pole obrazu określa obraz kontenera modułu łącznika usługi Data Lake. Ma następujące pola podrzędne:
    • repository: nazwa rejestru kontenerów i repozytorium, w którym jest przechowywany obraz.
    • tag: tag obrazu do użycia.
    • pullPolicy: zasady ściągania obrazu. Może to być jeden z Alwayselementów , IfNotPresentlub Never.
  • instances: liczba replik łącznika data lake do uruchomienia.
  • logLevel: poziom dziennika dla modułu łącznika usługi Data Lake. Może to być jeden z traceelementów , debug, info, warn, errorlub fatal.
  • databaseFormat: format danych do pozyskiwania w usłudze Data Lake Storage. Może to być jeden z delta lub parquet.
  • target: pole docelowe określa miejsce docelowe pozyskiwania danych. Może to być datalakeStorage, , adxfabricOneLakelub localStorage.
    • datalakeStorage: określa konfigurację i właściwości konta ADLSv2. Ma następujące pola podrzędne:
      • endpoint: adres URL punktu końcowego konta usługi Data Lake Storage. Nie dołączaj żadnego ukośnika /końcowego .
      • authentication: Pole uwierzytelniania określa typ i poświadczenia dostępu do konta usługi Data Lake Storage. Może to być jeden z następujących elementów.
        • accessTokenSecretName: nazwa wpisu tajnego kubernetes do korzystania z uwierzytelniania tokenu dostępu współdzielonego dla konta usługi Data Lake Storage. To pole jest wymagane, jeśli typ to accessToken.
        • systemAssignedManagedIdentity: w przypadku korzystania z tożsamości zarządzanej przez system na potrzeby uwierzytelniania. Ma jedno pole podrzędne
          • audience: ciąg w postaci https://<my-account-name>.blob.core.windows.net grupy odbiorców tokenu tożsamości zarządzanej o określonym zakresie na poziomie konta lub https://storage.azure.com dla dowolnego konta magazynu.
    • fabricOneLake: określa konfigurację i właściwości usługi Microsoft Fabric OneLake. Ma następujące pola podrzędne:
      • endpoint: adres URL punktu końcowego usługi Microsoft Fabric OneLake. Zazwyczaj jest https://onelake.dfs.fabric.microsoft.com to punkt końcowy globalny OneLake. Jeśli używasz regionalnego punktu końcowego, jest on w postaci https://<region>-onelake.dfs.fabric.microsoft.com. Nie dołączaj żadnego ukośnika /końcowego . Aby dowiedzieć się więcej, zobacz Połączenie do usługi Microsoft OneLake.
      • names: określa nazwy obszaru roboczego i lakehouse. Użyj tego pola lub guids. Nie używaj obu tych elementów. Ma następujące pola podrzędne:
        • workspaceName: nazwa obszaru roboczego.
        • lakehouseName: nazwa jeziora.
      • guids: określa identyfikatory GUID obszaru roboczego i lakehouse. Użyj tego pola lub names. Nie używaj obu tych elementów. Ma następujące pola podrzędne:
        • workspaceGuid: identyfikator GUID obszaru roboczego.
        • lakehouseGuid: identyfikator GUID lakehouse.
      • fabricPath: lokalizacja danych w obszarze roboczym Sieć szkieletowa. Może to być wartość tables lub files. Jeśli tak tables, dane są przechowywane w usłudze Fabric OneLake jako tabele. Jeśli jest filesto , dane są przechowywane w usłudze Fabric OneLake jako pliki. Jeśli ma wartość files, databaseFormat musi to być parquet.
      • authentication: Pole uwierzytelniania określa typ i poświadczenia dostępu do usługi Microsoft Fabric OneLake. To może być systemAssignedManagedIdentity tylko na razie. Ma jedno pole podrzędne:
      • systemAssignedManagedIdentity: w przypadku korzystania z tożsamości zarządzanej przez system na potrzeby uwierzytelniania. Ma jedno pole podrzędne
        • audience: ciąg dla odbiorców tokenu tożsamości zarządzanej i musi to być https://storage.azure.com.
    • adx: określa konfigurację i właściwości bazy danych usługi Azure Data Explorer. Ma następujące pola podrzędne:
      • endpoint: adres URL punktu końcowego klastra usługi Azure Data Explorer, taki jak https://<CLUSTER>.<REGION>.kusto.windows.net. Nie dołączaj żadnego ukośnika /końcowego .
      • authentication: Pole uwierzytelniania określa typ i poświadczenia na potrzeby uzyskiwania dostępu do klastra usługi Azure Data Explorer. To może być systemAssignedManagedIdentity tylko na razie. Ma jedno pole podrzędne:
        • systemAssignedManagedIdentity: w przypadku korzystania z tożsamości zarządzanej przez system na potrzeby uwierzytelniania. Ma jedno pole podrzędne
          • audience: ciąg dla odbiorców tokenu tożsamości zarządzanej i powinien to być https://api.kusto.windows.net.
    • localStorage: określa konfigurację i właściwości lokalnego konta magazynu. Ma następujące pola podrzędne:
      • volumeName: nazwa woluminu zainstalowanego w każdym zasobniku łącznika.
  • localBrokerConnection: służy do zastępowania domyślnej konfiguracji połączenia z brokerem MQTT IoT MQTT. Zobacz Zarządzanie lokalnym połączeniem brokera.

DataLake Połączenie orTopicMap

DataLake Połączenie orTopicMap to niestandardowy zasób kubernetes definiujący mapowanie między tematem MQTT a tabelą delty na koncie usługi Data Lake Storage. Zasób DataLake Połączenie orTopicMap odwołuje się do zasobu DataLake Połączenie or działającego na tym samym urządzeniu brzegowym i pozyskiwania danych z tematu MQTT do tabeli delty.

Pole specyfikacji zasobu DataLake Połączenie orTopicMap zawiera następujące pola podrzędne:

  • dataLakeConnectorRef: nazwa zasobu DataLake Połączenie or, do którego należy ta mapa tematu.
  • mapping: Pole mapowania określa szczegóły i właściwości tematu MQTT oraz tabelę delta. Ma następujące pola podrzędne:
    • allowedLatencySecs: maksymalne opóźnienie w sekundach między odbieraniem komunikatu z tematu MQTT i pozyskiwaniem go do tabeli delty. To pole jest wymagane.
    • clientId: unikatowy identyfikator klienta MQTT, który subskrybuje temat.
    • maxMessagesPerBatch: maksymalna liczba komunikatów do pozyskiwania w jednej partii do tabeli delty. Ze względu na tymczasowe ograniczenie ta wartość musi być mniejsza niż 16, jeśli qos jest ustawiona na 1. To pole jest wymagane.
    • messagePayloadType: typ ładunku wysyłanego do tematu MQTT. Może to być jeden z json lub avro (jeszcze nieobsługiwany).
    • mqttSourceTopic: nazwa tematów MQTT do subskrybowania. Obsługuje notację wieloznacznych symboli wieloznacznych w temacie MQTT.
    • qos: jakość poziomu usług dla subskrybowania tematu MQTT. Może to być jeden z 0 lub 1.
    • table: Pole tabeli określa konfigurację i właściwości tabeli delta na koncie usługi Data Lake Storage. Ma następujące pola podrzędne:
      • tableName: nazwa tabeli delty do utworzenia lub dołączenia do elementu na koncie usługi Data Lake Storage. To pole jest również nazywane nazwą kontenera w przypadku użycia z usługą Azure Data Lake Storage Gen2. Może zawierać dowolną małą literę angielską i znak underbar _, o długości do 256 znaków. Nie są dozwolone żadne kreski - ani znaki spacji.
      • tablePath: nazwa bazy danych usługi Azure Data Explorer podczas korzystania z adx łącznika typów.
      • schema: schemat tabeli delty, który powinien być zgodny z formatem i polami ładunku komunikatu. Jest to tablica obiektów z następującymi polami podrzędnymi:
        • name: nazwa kolumny w tabeli delty.
        • format: typ danych kolumny w tabeli delty. Może to być jeden z booleanelementów , int8, int32int64uInt32uInt64uInt16float16float32uInt8int16date32timestampfloat64binarylub .utf8 Typy niepodpisane, takie jak uInt8, nie są w pełni obsługiwane i są traktowane jako typy podpisane, jeśli określono tutaj.
        • optional: wartość logiczna wskazująca, czy kolumna jest opcjonalna, czy wymagana. To pole jest opcjonalne i domyślnie ma wartość false.
        • mapping: wyrażenie ścieżki JSON definiujące sposób wyodrębniania wartości kolumny z ładunku komunikatu MQTT. Wbudowane mapowania $client_id, , $topic$propertyi $received_time są dostępne do użycia jako kolumny w celu wzbogacania kodu JSON w treści komunikatu MQTT. To pole jest wymagane. Użyj $property dla właściwości użytkownika MQTT. Na przykład $property.assetId reprezentuje wartość właściwości assetId z komunikatu MQTT.

Oto przykład zasobu DataLake Połączenie orTopicMap:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: datalake-topicmap
  namespace: azure-iot-operations
spec:
  dataLakeConnectorRef: my-datalake-connector
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      tableName: thermostat
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

Ciągowany kod JSON, taki jak "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}" nie jest obsługiwany i powoduje, że łącznik zgłasza błąd wartości null przez łącznik.

Przykładowy komunikat dotyczący tematu azure-iot-operations/data/thermostat , który działa z tym schematem:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

Które mapuje na:

externalAssetId assetName CurrentTemperature Ciśnienie mqttTopic timestamp
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx termostat-de 5506 5506 Dlc 2024-04-02T22:36:03.1827681Z

Ważne

Jeśli schemat danych zostanie zaktualizowany, na przykład typ danych zostanie zmieniony lub nazwa zostanie zmieniona, przekształcenie danych przychodzących może przestać działać. Jeśli wystąpi zmiana schematu, musisz zmienić nazwę tabeli danych.

Delta lub parquet

Obsługiwane są formaty różnicowe i parquet.

Zarządzanie lokalnym połączeniem brokera

Podobnie jak mostek MQTT, łącznik usługi Data Lake działa jako klient brokera MQTT IoT MQTT. Jeśli dostosowano port odbiornika lub uwierzytelnianie brokera MQTT IoT MQTT, przesłoń również lokalną konfigurację połączenia MQTT dla łącznika usługi Data Lake. Aby dowiedzieć się więcej, zobacz Połączenie lokalnego brokera mostka MQTT.

Publikowanie i subskrybowanie komunikatów MQTT przy użyciu usługi Azure IoT MQ (wersja zapoznawcza)