Mengirim data dari Pratinjau Azure IoT MQ ke Data Lake Storage

Penting

Pratinjau Operasi Azure IoT – diaktifkan oleh Azure Arc saat ini dalam PRATINJAU. Anda tidak boleh menggunakan perangkat lunak pratinjau ini di lingkungan produksi.

Lihat Ketentuan Penggunaan Tambahan untuk Pratinjau Microsoft Azure untuk persyaratan hukum yang berlaku pada fitur Azure dalam versi beta, pratinjau, atau belum dirilis secara umum.

Anda dapat menggunakan konektor data lake untuk mengirim data dari broker Pratinjau Azure IoT MQ ke data lake, seperti Azure Data Lake Storage Gen2 (ADLSv2), Microsoft Fabric OneLake, dan Azure Data Explorer. Konektor berlangganan topik MQTT dan menyerap pesan ke dalam tabel Delta di akun Data Lake Storage.

Prasyarat

Mengonfigurasi untuk mengirim data ke Microsoft Fabric OneLake menggunakan identitas terkelola

Konfigurasikan konektor data lake untuk terhubung ke Microsoft Fabric OneLake menggunakan identitas terkelola.

  1. Pastikan bahwa langkah-langkah dalam prasyarat terpenuhi, termasuk ruang kerja Microsoft Fabric dan lakehouse. Ruang kerja saya default tidak dapat digunakan.

  2. Pastikan ekstensi IoT MQ Arc diinstal dan dikonfigurasi dengan identitas terkelola.

  3. Di portal Azure, buka kluster Kubernetes yang terhubung dengan Arc dan pilih Pengaturan> Ekstensi. Dalam daftar ekstensi, cari nama ekstensi IoT MQ Anda. Nama dimulai dengan mq- diikuti oleh lima karakter acak. Misalnya, mq-4jgjs.

  4. Dapatkan ID aplikasi yang terkait dengan identitas terkelola ekstensi IoT MQ Arc, dan catat nilai GUID. ID aplikasi berbeda dari objek atau ID utama. Anda dapat menggunakan Azure CLI dengan menemukan ID objek identitas terkelola lalu mengkueri ID aplikasi perwakilan layanan yang terkait dengan identitas terkelola. Contohnya:

    OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv)
    az ad sp show --query appId --id $OBJECT_ID --output tsv
    

    Anda harus mendapatkan output dengan nilai GUID:

    xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    

    GUID ini adalah ID aplikasi yang perlu Anda gunakan di langkah berikutnya.

  5. Di ruang kerja Microsoft Fabric, gunakan Kelola akses, lalu pilih + Tambahkan orang atau grup.

  6. Cari ekstensi IoT MQ Arc dengan namanya "mq", dan pastikan untuk memilih nilai GUID ID aplikasi yang Anda temukan di langkah sebelumnya.

  7. Pilih Kontributor sebagai peran, lalu pilih Tambahkan.

  8. Buat sumber daya DataLake Koneksi or yang menentukan pengaturan konfigurasi dan titik akhir untuk konektor. Anda dapat menggunakan YAML yang disediakan sebagai contoh, tetapi pastikan untuk mengubah bidang berikut:

    • target.fabricOneLake.endpoint: Titik akhir akun Microsoft Fabric OneLake. Anda bisa mendapatkan URL titik akhir dari Microsoft Fabric lakehouse di bawah Properti File>. URL akan terlihat seperti https://onelake.dfs.fabric.microsoft.com.
    • target.fabricOneLake.names: Nama-nama ruang kerja dan lakehouse. Gunakan bidang ini atau guids. Jangan gunakan keduanya.
      • workspaceName: Nama ruang kerja.
      • lakehouseName: Nama lakehouse.
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: info
      databaseFormat: delta
      target:
        fabricOneLake:
          # Example: https://onelake.dfs.fabric.microsoft.com
          endpoint: <example-endpoint-url>
          names:
            workspaceName: <example-workspace-name>
            lakehouseName: <example-lakehouse-name>
          ## OR
          # guids:
          #   workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          #   lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          fabricPath: tables
          authentication:
            systemAssignedManagedIdentity:
              audience: https://storage.azure.com/
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  9. Buat sumber daya DataLake Koneksi orTopicMap yang menentukan pemetaan antara topik MQTT dan tabel Delta di Data Lake Storage. Anda dapat menggunakan YAML yang disediakan sebagai contoh, tetapi pastikan untuk mengubah bidang berikut:

    • dataLakeConnectorRef: Nama sumber daya DataLake Koneksi or yang Anda buat sebelumnya.
    • clientId: Pengidentifikasi unik untuk klien MQTT Anda.
    • mqttSourceTopic: Nama topik MQTT yang Anda inginkan untuk berasal dari data.
    • table.tableName: Nama tabel yang ingin Anda tambahkan di lakehouse. Tabel dibuat secara otomatis jika tidak ada.
    • table.schema: Skema tabel Delta yang harus cocok dengan format dan bidang pesan JSON yang Anda kirim ke topik MQTT.
  10. Terapkan sumber daya DataLake Koneksi or dan DataLake Koneksi orTopicMap ke kluster Kubernetes Anda menggunakan kubectl apply -f datalake-connector.yaml.

  11. Mulai kirim pesan JSON ke topik MQTT menggunakan penerbit MQTT Anda. Instans konektor data lake berlangganan topik dan menyerap pesan ke dalam tabel Delta.

  12. Menggunakan browser, verifikasi bahwa data diimpor ke lakehouse. Di ruang kerja Microsoft Fabric, pilih lakehouse Anda lalu Tabel. Anda akan melihat data dalam tabel.

Tabel tidak teridentifikasi

Jika data Anda ditampilkan dalam tabel Tidak Teridentifikasi :

Penyebabnya mungkin karakter yang tidak didukung dalam nama tabel. Nama tabel harus berupa nama kontainer Azure Storage yang valid yang berarti dapat berisi huruf bahasa Inggris, huruf besar atau kecil, dan underbar _, dengan panjang hingga 256 karakter. Tidak ada tanda hubung - atau karakter spasi yang diizinkan.

Mengonfigurasi untuk mengirim data ke Azure Data Lake Storage Gen2 menggunakan token SAS

Konfigurasikan konektor data lake untuk menyambungkan ke akun Azure Data Lake Storage Gen2 (ADLS Gen2) menggunakan token tanda tangan akses bersama (SAS).

  1. Dapatkan token SAS untuk akun Azure Data Lake Storage Gen2 (ADLS Gen2). Misalnya, gunakan portal Azure untuk menelusuri ke akun penyimpanan Anda. Di menu di bawah Keamanan + jaringan, pilih Tanda tangan akses bersama. Gunakan tabel berikut untuk mengatur izin yang diperlukan.

    Parameter Nilai
    Layanan yang diizinkan Blob
    Jenis sumber daya yang diizinkan Objek, Kontainer
    Izin yang diizinkan Baca, Tulis, Hapus, Daftar, Buat

    Untuk mengoptimalkan hak istimewa paling sedikit, Anda juga dapat memilih untuk mendapatkan SAS untuk kontainer individual. Untuk mencegah kesalahan autentikasi, pastikan kontainer cocok dengan table.tableName nilai dalam konfigurasi peta topik.

  2. Buat rahasia Kubernetes dengan token SAS. Jangan sertakan tanda ? tanya yang mungkin ada di awal token.

    kubectl create secret generic my-sas \
    --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \
    -n azure-iot-operations
    
  3. Buat sumber daya DataLake Koneksi or yang menentukan pengaturan konfigurasi dan titik akhir untuk konektor. Anda dapat menggunakan YAML yang disediakan sebagai contoh, tetapi pastikan untuk mengubah bidang berikut:

    • endpoint: Titik akhir Data Lake Storage dari akun penyimpanan ADLSv2 dalam bentuk https://example.blob.core.windows.net. Di portal Azure, temukan titik akhir di bawah Akun > penyimpanan Pengaturan > Titik > Akhir Data Lake Storage.
    • accessTokenSecretName: Nama rahasia Kubernetes yang berisi token SAS (my-sas dari contoh sebelumnya).
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: "debug"
      databaseFormat: "delta"
      target:
        datalakeStorage:
          endpoint: "https://example.blob.core.windows.net"
          authentication:
            accessTokenSecretName: "my-sas"
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  4. Buat sumber daya DataLake Koneksi orTopicMap yang menentukan pemetaan antara topik MQTT dan tabel Delta di Data Lake Storage. Anda dapat menggunakan YAML yang disediakan sebagai contoh, tetapi pastikan untuk mengubah bidang berikut:

    • dataLakeConnectorRef: Nama sumber daya DataLake Koneksi or yang Anda buat sebelumnya.
    • clientId: Pengidentifikasi unik untuk klien MQTT Anda.
    • mqttSourceTopic: Nama topik MQTT yang Anda inginkan untuk berasal dari data.
    • table.tableName: Nama kontainer yang ingin Anda tambahkan di Data Lake Storage. Jika token SAS dilingkup ke akun, kontainer secara otomatis dibuat jika hilang.
    • table.schema: Skema tabel Delta, yang harus cocok dengan format dan bidang pesan JSON yang Anda kirim ke topik MQTT.
  5. Terapkan sumber daya DataLake Koneksi or dan DataLake Koneksi orTopicMap ke kluster Kubernetes Anda menggunakan kubectl apply -f datalake-connector.yaml.

  6. Mulai kirim pesan JSON ke topik MQTT menggunakan penerbit MQTT Anda. Instans konektor data lake berlangganan topik dan menyerap pesan ke dalam tabel Delta.

  7. Menggunakan portal Azure, verifikasi bahwa tabel Delta dibuat. File diatur berdasarkan ID klien, nama instans konektor, topik MQTT, dan waktu. Di Kontainer akun> penyimpanan Anda, buka kontainer yang Anda tentukan di DataLake Koneksi orTopicMap. Verifikasi _delta_log ada dan file parque menunjukkan lalu lintas MQTT. Buka file parque untuk mengonfirmasi payload yang cocok dengan apa yang dikirim dan ditentukan dalam skema.

Menggunakan identitas terkelola untuk autentikasi ke ADLSv2

Untuk menggunakan identitas terkelola, tentukan sebagai satu-satunya metode di bawah DataLake Koneksi or authentication. Gunakan az k8s-extension show untuk menemukan ID utama untuk ekstensi IoT MQ Arc, lalu tetapkan peran ke identitas terkelola yang memberikan izin untuk menulis ke akun penyimpanan, seperti Kontributor Data Blob Penyimpanan. Untuk mempelajari selengkapnya, lihat Mengotorisasi akses ke blob menggunakan ID Microsoft Entra.

authentication:
  systemAssignedManagedIdentity:
    audience: https://my-account.blob.core.windows.net

Mengonfigurasi untuk mengirim data ke Azure Data Explorer menggunakan identitas terkelola

Konfigurasikan konektor data lake untuk mengirim data ke titik akhir Azure Data Explorer menggunakan identitas terkelola.

  1. Pastikan bahwa langkah-langkah dalam prasyarat terpenuhi, termasuk kluster Azure Data Explorer lengkap. Opsi "kluster gratis" tidak berfungsi.

  2. Setelah kluster dibuat, buat database untuk menyimpan data Anda.

  3. Anda bisa membuat tabel untuk data tertentu melalui portal Azure dan membuat kolom secara manual, atau Anda bisa menggunakan KQL di tab kueri. Misalnya:

    .create table thermostat (
        externalAssetId: string,
        assetName: string,
        CurrentTemperature: real,
        Pressure: real,
        MqttTopic: string,
        Timestamp: datetime
    )
    

Mengaktifkan penyerapan streaming

Aktifkan penyerapan streaming pada tabel dan database Anda. Di tab kueri, jalankan perintah berikut, ganti <DATABASE_NAME> dengan nama database Anda:

.alter database <DATABASE_NAME> policy streamingingestion enable

Menambahkan identitas terkelola ke kluster Azure Data Explorer

Agar konektor dapat mengautentikasi ke Azure Data Explorer, Anda harus menambahkan identitas terkelola ke kluster Azure Data Explorer.

  1. Di portal Azure, buka kluster Kubernetes yang terhubung dengan Arc dan pilih Pengaturan> Ekstensi. Dalam daftar ekstensi, cari nama ekstensi IoT MQ Anda. Nama dimulai dengan mq- diikuti oleh lima karakter acak. Misalnya, mq-4jgjs. Nama ekstensi IoT MQ sama dengan nama identitas terkelola MQ.
  2. Di database Azure Data Explorer Anda, pilih Izin>Tambahkan>Ingestor. Cari nama identitas terkelola MQ dan tambahkan.

Untuk informasi selengkapnya tentang menambahkan izin, lihat Mengelola izin kluster Azure Data Explorer.

Sekarang, Anda siap untuk menyebarkan konektor dan mengirim data ke Azure Data Explorer.

Contoh file penyebaran

Contoh file penyebaran untuk konektor Azure Data Explorer. Komentar yang dimulai dengan TODO mengharuskan Anda mengganti pengaturan tempat penampung dengan informasi Anda.

apiVersion: mq.iotoperations.azure.com/v1beta1
  name: my-adx-connector
  namespace: azure-iot-operations
spec:
    repository: mcr.microsoft.com/azureiotoperations/datalake
    tag: 0.4.0-preview
    pullPolicy: Always
  databaseFormat: adx
  target:
      # TODO: insert the ADX cluster endpoint
      endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
      authentication:
        systemAssignedManagedIdentity:
          audience: https://api.kusto.windows.net
  localBrokerConnection:
    endpoint: aio-mq-dmqtt-frontend:8883
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
    authentication:
      kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: adx-topicmap
  namespace: azure-iot-operations
spec:
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      # TODO: add DB and table name
      tablePath: <DATABASE_NAME>
      tableName: <TABLE_NAME>
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: MqttTopic
        format: utf8
        optional: false
        mapping: $topic
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

Contoh ini menerima data dari azure-iot-operations/data/thermostat topik dengan pesan dalam format JSON seperti berikut ini:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

DataLake Koneksi or

DataLake Koneksi or adalah sumber daya kustom Kubernetes yang menentukan konfigurasi dan properti instans konektor data lake. Konektor data lake menyerap data dari topik MQTT ke dalam tabel Delta di akun Data Lake Storage.

Bidang spesifikasi sumber daya DataLake Koneksi or berisi subbidang berikut:

  • protocol: Versi MQTT. Ini bisa menjadi salah satu dari v5 atau v3.
  • image: Bidang gambar menentukan gambar kontainer modul konektor data lake. Ini memiliki subbidang berikut:
    • repository: Nama registri dan repositori kontainer tempat gambar disimpan.
    • tag: Tag gambar yang akan digunakan.
    • pullPolicy: Kebijakan penarikan untuk gambar. Ini bisa menjadi salah satu dari Always, IfNotPresent, atau Never.
  • instances: Jumlah replika konektor data lake yang akan dijalankan.
  • logLevel: Tingkat log untuk modul konektor data lake. Ini bisa menjadi salah satu dari trace, debug, info, warn, error, atau fatal.
  • databaseFormat: Format data yang akan diserap ke dalam Data Lake Storage. Ini bisa menjadi salah satu dari delta atau parquet.
  • target: Bidang target menentukan tujuan penyerapan data. Ini bisa , datalakeStorage, fabricOneLakeadx, atau localStorage.
    • datalakeStorage: Menentukan konfigurasi dan properti akun ADLSv2. Ini memiliki subbidang berikut:
      • endpoint: URL titik akhir akun Data Lake Storage. Jangan sertakan garis miring /berikutnya.
      • authentication: Bidang autentikasi menentukan jenis dan kredensial untuk mengakses akun Data Lake Storage. Ini bisa menjadi salah satu dari berikut ini.
        • accessTokenSecretName: Nama rahasia Kubernetes untuk menggunakan autentikasi token akses bersama untuk akun Data Lake Storage. Bidang ini diperlukan jika jenisnya adalah accessToken.
        • systemAssignedManagedIdentity: Untuk menggunakan identitas terkelola sistem untuk autentikasi. Ini memiliki satu subbidang
          • audience: String dalam bentuk https://<my-account-name>.blob.core.windows.net untuk audiens token identitas terkelola yang dilingkup ke tingkat akun atau https://storage.azure.com untuk akun penyimpanan apa pun.
    • fabricOneLake: Menentukan konfigurasi dan properti Microsoft Fabric OneLake. Ini memiliki subbidang berikut:
      • endpoint: URL titik akhir Microsoft Fabric OneLake. Biasanya https://onelake.dfs.fabric.microsoft.com karena itu adalah titik akhir global OneLake. Jika Anda menggunakan titik akhir regional, titik akhir tersebut dalam bentuk https://<region>-onelake.dfs.fabric.microsoft.com. Jangan sertakan garis miring /berikutnya. Untuk mempelajari selengkapnya, lihat Koneksi ke Microsoft OneLake.
      • names: Menentukan nama ruang kerja dan lakehouse. Gunakan bidang ini atau guids. Jangan gunakan keduanya. Ini memiliki subbidang berikut:
        • workspaceName: Nama ruang kerja.
        • lakehouseName: Nama lakehouse.
      • guids: Menentukan GUID ruang kerja dan lakehouse. Gunakan bidang ini atau names. Jangan gunakan keduanya. Ini memiliki subbidang berikut:
        • workspaceGuid: GUID ruang kerja.
        • lakehouseGuid: GUID lakehouse.
      • fabricPath: Lokasi data di ruang kerja Fabric. Ini bisa berupa tables atau files. Jika itu tables, data disimpan di Fabric OneLake sebagai tabel. Jika itu files, data disimpan di Fabric OneLake sebagai file. Jika itu files, databaseFormat harus parquet.
      • authentication: Bidang autentikasi menentukan jenis dan kredensial untuk mengakses Microsoft Fabric OneLake. Ini hanya bisa systemAssignedManagedIdentity untuk saat ini. Ini memiliki satu subbidang:
      • systemAssignedManagedIdentity: Untuk menggunakan identitas terkelola sistem untuk autentikasi. Ini memiliki satu subbidang
        • audience: String untuk audiens token identitas terkelola dan harus https://storage.azure.com.
    • adx: Menentukan konfigurasi dan properti database Azure Data Explorer. Ini memiliki subbidang berikut:
      • endpoint: URL titik akhir kluster Azure Data Explorer seperti https://<CLUSTER>.<REGION>.kusto.windows.net. Jangan sertakan garis miring /berikutnya.
      • authentication: Bidang autentikasi menentukan jenis dan kredensial untuk mengakses kluster Azure Data Explorer. Ini hanya bisa systemAssignedManagedIdentity untuk saat ini. Ini memiliki satu subbidang:
        • systemAssignedManagedIdentity: Untuk menggunakan identitas terkelola sistem untuk autentikasi. Ini memiliki satu subbidang
          • audience: String untuk audiens token identitas terkelola dan harus https://api.kusto.windows.net.
    • localStorage: Menentukan konfigurasi dan properti akun penyimpanan lokal. Ini memiliki subbidang berikut:
      • volumeName: Nama volume yang dipasang ke dalam masing-masing pod konektor.
  • localBrokerConnection: Digunakan untuk mengambil alih konfigurasi koneksi default ke broker IoT MQ MQTT. Lihat Mengelola koneksi broker lokal.

DataLake Koneksi orTopicMap

DataLake Koneksi orTopicMap adalah sumber daya kustom Kubernetes yang menentukan pemetaan antara topik MQTT dan tabel Delta di akun Data Lake Storage. Sumber daya DataLake Koneksi orTopicMap mereferensikan sumber daya DataLake Koneksi or yang berjalan pada perangkat edge yang sama dan menyerap data dari topik MQTT ke dalam tabel Delta.

Bidang spesifikasi sumber daya DataLake Koneksi orTopicMap berisi subbidang berikut:

  • dataLakeConnectorRef: Nama sumber daya DataLake Koneksi or yang dimiliki peta topik ini.
  • mapping: Bidang pemetaan menentukan detail dan properti topik MQTT dan tabel Delta. Ini memiliki subbidang berikut:
    • allowedLatencySecs: Latensi maksimum dalam detik antara menerima pesan dari topik MQTT dan menyerapnya ke dalam tabel Delta. Bidang ini wajib diisi.
    • clientId: Pengidentifikasi unik untuk klien MQTT yang berlangganan topik tersebut.
    • maxMessagesPerBatch: Jumlah maksimum pesan yang akan diserap dalam satu batch ke dalam tabel Delta. Karena pembatasan sementara, nilai ini harus kurang dari 16 jika qos diatur ke 1. Bidang ini wajib diisi.
    • messagePayloadType: Jenis payload yang dikirim ke topik MQTT. Ini bisa menjadi salah satu dari json atau avro (belum didukung).
    • mqttSourceTopic: Nama topik MQTT untuk berlangganan. Mendukung notasi wildcard topik MQTT.
    • qos: Kualitas tingkat layanan untuk berlangganan topik MQTT. Ini bisa menjadi salah satu dari 0 atau 1.
    • table: Bidang tabel menentukan konfigurasi dan properti tabel Delta di akun Data Lake Storage. Ini memiliki subbidang berikut:
      • tableName: Nama tabel Delta yang akan dibuat atau ditambahkan di akun Data Lake Storage. Bidang ini juga dikenal sebagai nama kontainer saat digunakan dengan Azure Data Lake Storage Gen2. Ini dapat berisi huruf kecil bahasa Inggris, dan underbar _, dengan panjang hingga 256 karakter. Tidak ada tanda hubung - atau karakter spasi yang diizinkan.
      • tablePath: Nama database Azure Data Explorer saat menggunakan adx konektor jenis.
      • schema: Skema tabel Delta, yang harus cocok dengan format dan bidang payload pesan. Ini adalah array objek, masing-masing dengan subbidang berikut:
        • name: Nama kolom dalam tabel Delta.
        • format: Jenis data kolom dalam tabel Delta. Ini bisa menjadi salah satu dari boolean, , int8, int16int32, int64, uInt8, uInt16, uInt64uInt32, , float16, float32, float64, timestampdate32, binary, , atau utf8. Jenis yang tidak ditandatangani, seperti uInt8, tidak didukung sepenuhnya, dan diperlakukan sebagai jenis yang ditandatangani jika ditentukan di sini.
        • optional: Nilai boolean yang menunjukkan apakah kolom bersifat opsional atau diperlukan. Bidang ini bersifat opsional dan default ke false.
        • mapping: Ekspresi jalur JSON yang menentukan cara mengekstrak nilai kolom dari payload pesan MQTT. Pemetaan bawaan $client_id, , $topic$properties, dan $received_time tersedia untuk digunakan sebagai kolom untuk memperkaya JSON dalam isi pesan MQTT. Bidang ini wajib diisi. Gunakan $properties untuk properti pengguna MQTT. Misalnya, $properties.assetId mewakili nilai properti assetId dari pesan MQTT.

Berikut adalah contoh sumber daya DataLake Koneksi orTopicMap:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: datalake-topicmap
  namespace: azure-iot-operations
spec:
  dataLakeConnectorRef: my-datalake-connector
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      tableName: thermostat
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

JSON yang di stringifikasi seperti "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}" tidak didukung dan menyebabkan konektor melemparkan konverter menemukan kesalahan nilai null.

Contoh pesan untuk azure-iot-operations/data/thermostat topik yang berfungsi dengan skema ini:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

Peta mana yang akan:

externalAssetId namaAset CurrentTemperature Tekanan mqttTopic rentang waktu
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx termostat-de 5506 5506 Dlc 2024-04-02T22:36:03.1827681Z

Penting

Jika skema data diperbarui, misalnya jenis data diubah atau nama diubah, transformasi data masuk mungkin berhenti berfungsi. Anda perlu mengubah nama tabel data jika terjadi perubahan skema.

Delta atau parkquet

Format delta dan parquet didukung.

Mengelola koneksi broker lokal

Seperti jembatan MQTT, konektor data lake bertindak sebagai klien ke broker MQTT IoT MQTT. Jika Anda telah menyesuaikan port pendengar atau autentikasi broker MQTT IoT MQTT Anda, ambil alih konfigurasi koneksi MQTT lokal untuk konektor data lake juga. Untuk mempelajari lebih lanjut, lihat Koneksi broker lokal jembatan MQTT.

Menerbitkan dan berlangganan pesan MQTT menggunakan Pratinjau Azure IoT MQ