你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

在 IoT Central 应用程序中转换要导出的数据

IoT 设备发送各种格式的数据。 若要在 IoT 解决方案中使用设备数据,可能需要先转换设备数据,然后再将其导出到其他服务。

本文介绍如何在 IoT Central 应用程序中将设备数据转换为数据导出定义的一部分。

使用 IoT Central 数据导出定义中的转换,可以在将设备数据导出到目标之前,先对其格式和结构进行处理。 可为导出定义中的每个目标指定转换。 每条消息将会经历转换,以创建要导出到目标的输出记录。

使用转换可以重新构建 JSON 有效负载、重命名字段、筛选出字段,以及对遥测值运行简单计算。 例如,使用转换可将消息转换为与目标(例如 Azure 数据资源管理器表)的架构匹配的表格格式。

以下视频介绍了 IoT Central 数据转换:

添加转换

若要为数据导出中的目标添加转换,请选择“+ 转换”,如以下屏幕截图所示

Screenshot that shows how to add a transformation to a destination.

使用“数据转换”面板可以指定转换。 在“1. 添加输入消息”部分,可以输入要经历转换的示例消息。 还可以通过选择设备模板来生成示例消息。 在“2. 生成转换查询”部分,可以输入用于转换输入消息的查询。 “3. 预览输出消息”部分显示转换结果

Screenshot of transformation editor in IoT Central.

提示

如果你不知道输入消息的格式,请使用 . 作为查询,将消息按原样导出到目标(例如 Webhook)。 然后将 Webhook 收到的消息粘贴到 1 中。添加输入消息。 然后生成一个转换查询,将此消息处理成所需的输出格式。

生成转换查询

转换引擎使用开源的 JQ JSON 处理器来重新构建和格式化 JSON 有效负载。 若要指定转换,请编写一个 JQ 查询,该查询可以使用 JQ 的内置筛选器、函数和功能。 有关查询示例,请参阅示例转换查询。 若要详细了解如何编写 JQ 查询,请参阅 JQ 手册

预转换消息结构

可以从 IoT Central 导出以下数据流:遥测数据、属性更改、设备连接事件、设备生命周期事件和设备模板生命周期事件。 每种类型的数据都有特定的结构,其中包括遥测值、应用程序信息、设备元数据和属性值等信息。

以下示例演示了遥测消息的形状。 所有这些数据都可用于转换。 其他消息类型的消息结构类似,但具有某些特定于类型的字段。 可以使用“添加输入消息”功能根据应用程序中的设备模板生成示例消息

{
  "applicationId": "93d68c98-9a22-4b28-94d1-06625d4c3d0f",
  "device": {
    "id": "31edabe6-e0b9-4c83-b0df-d12e95745b9f",
    "name": "Scripted Device - 31edabe6-e0b9-4c83-b0df-d12e95745b9f",
    "cloudProperties": [],
    "properties": {
      "reported": [
        {
          "id": "urn:smartKneeBrace:Smart_Vitals_Patch_wr:FirmwareVersion:1",
          "name": "FirmwareVersion",
          "value": 1.0
        }
      ]
    },
    "templateId": "urn:sbq3croo:modelDefinition:nf7st1wn3",
    "templateName": "Smart Knee Brace"
  },
  "telemetry": [
      {
        "id": "urn:continuousPatientMonitoringTemplate:Smart_Knee_Brace_6wm:Acceleration:1",
        "name": "Acceleration",
        "value": {
          "x": 19.212770659918583,
          "y": 20.596296675217335,
          "z": 54.04859440697045
        }
      },
      {
        "id": "urn:continuousPatientMonitoringTemplate:Smart_Knee_Brace_6wm:RangeOfMotion:1",
        "name": "RangeOfMotion",
        "value": 110
      }
  ],
  "enqueuedTime": "2021-03-23T19:55:56.971Z",
  "enrichments": {
      "your-enrichment-key": "enrichment-value"
  },
  "messageProperties": {
      "prop1": "prop-value"
  },
  "messageSource": "telemetry"
}

提示

使用 IoT Central 应用程序 UI 中的“添加输入消息”功能查看其他数据导出类型(例如属性更改)的示例消息结构

示例转换查询

以下查询示例使用上一部分中显示的遥测消息。

示例 1:以下 JQ 查询将输入消息中的每项遥测数据作为单独的输出消息输出

.telemetry[]

JSON 输出:

{
  "id": "urn:continuousPatientMonitoringTemplate:Smart_Knee_Brace_6wm:Acceleration:1",
  "name": "Acceleration",
  "value": {
    "x": 19.212770659918583,
    "y": 20.596296675217335,
    "z": 54.04859440697045
  }
},
{
  "id": "urn:continuousPatientMonitoringTemplate:Smart_Knee_Brace_6wm:RangeOfMotion:1",
  "name": "RangeOfMotion",
  "value": 110
}

提示

若要将输出更改为包含遥测类型数组的单个消息,请使用查询 .telemetry

示例 2:以下 JQ 查询将输入遥测数组转换为以遥测名称作为键的对象

.telemetry | map({ key: .name, value: .value }) | from_entries

JSON 输出:

{
  "Acceleration": {
    "x": 19.212770659918583,
    "y": 20.596296675217335,
    "z": 54.04859440697045
  },
  "RangeOfMotion": 110
}

示例 3:以下 JQ 查询查找 RangeOfMotion 遥测值,并使用公式 rad = degree * pi / 180 将其从度转换为弧度。 此查询还演示如何导入和使用 iotc 模块:

import "iotc" as iotc;
{
  rangeOfMotion: (
    .telemetry
    | iotc::find(.name == "RangeOfMotion").value
    | . * 3.14159265358979323846 / 180
  )
}

JSON 输出:

{
  "rangeOfMotion": 1.9198621771937625
}

示例 4:若要将输入消息处理成表格格式,可将每条导出的消息映射到一个或多个行。 行输出在逻辑上表示为一个 JSON 对象,其中的列名是键,列值是值:

{
    "<column 1 name>": "<column 1 value>",
    "<column 2 name>": "<column 2 value>",
    ...
}

提示

导出到 Azure 数据资源管理器时使用表格格式。

以下 JQ 查询将行写入到跨不同设备存储 rangeOfMotion 遥测数据的表。 该查询将设备 ID、排队时间和运动范围映射到包含以下列的表:

import "iotc" as iotc;
{
    deviceId: .deviceId,
    timestamp: .enqueuedTime,
    rangeOfMotion: .telemetry | iotc::find(.name == "RangeOfMotion").value
}

JSON 格式的输出:

{
  "deviceId": "31edabe6-e0b9-4c83-b0df-d12e95745b9f",
  "timestamp": "2021-03-23T19:55:56.971Z",
  "rangeOfMotion": 110
}

IoT Central 模块

JQ 模块是自定义函数的集合。 作为转换查询的一部分,可以导入一个内置的 IoT Central 特定模块,其中包含使你可以更轻松地编写查询的函数。 若要导入 IoT Central 模块,请使用以下指令:

import "iotc" as iotc;

IoT Central 模块包括以下函数:

find(expression):使用 find 函数可以查找特定的数组元素,例如有效负载中的遥测值或属性条目。 函数输入是一个数组,参数定义了一个针对数组中每个元素运行的 JQ 筛选器。 函数返回筛选器计算结果为 true 的每个数组元素:

例如,若要查找名为 RangeOfMotion 的特定遥测值,请运行:

.telemetry | iotc::find(.name == "RangeOfMotion")

方案

以下方案使用转换功能为特定目标自定义设备数据格式。

方案 1:将设备数据导出到 Azure 数据资源管理器

此方案将转换设备数据以匹配 Azure 数据资源管理器中的固定架构,其中每个遥测值显示为表中的列,每行代表一条消息。 例如:

DeviceId Timestamp T1 T2 T3
"31edabe6-e0b9-4c83-b0df-d12e95745b9f" "2021-03-23T19:55:56.971Z 1.18898 1.434709 2.97008

若要导出与此表兼容的数据,每个导出的消息必须类似于以下对象。 该对象表示单个行,其中的键是列名,值是要放在每个列中的值:

{
    "Timestamp": <value-of-Timestamp>,
    "DeviceId": <value-of-deviceId>,
    "T1": <value-of-T1>,
    "T2": <value-of-T2>,
    "T3": <value-of-T3>,
}

在此方案中,设备在以下示例所示的输入消息中发送 t1t2t3 遥测值:

{
  "applicationId": "c57fe8d9-d15d-4659-9814-d3cc38ca9e1b",
  "enqueuedTime": "1933-01-26T03:10:44.480001324Z",
  "messageSource": "telemetry",
  "telemetry": [
    {
      "id": "dtmi:temperaturesensor288:sensors1lr:t1;1",
      "name": "t1",
      "value": 1.1889838348731093e+308
    },
    {
      "id": "dtmi:temperaturesensor288:sensors1lr:t2;1",
      "name": "t2",
      "value": 1.4347093391531383e+308
    },
    {
      "id": "dtmi:temperaturesensor288:sensors1lr:t3;1",
      "name": "t3",
      "value": 2.9700885230380616e+307
    }
  ],
  "device": {
    "id": "oozrnl1zs857",
    "name": "haptic alarm",
    "templateId": "dtmi:modelDefinition:nhhbjotee:qytxnp8hi",
    "templateName": "hapticsensors",
    "properties": {
      "reported": []
    },
    "cloudProperties": [],
    "simulated": true,
    "approved": false,
    "blocked": false,
    "provisioned": true
  }
}

以下 JQ 查询输出 T1T2T3 遥测值,并输出 TimestampdeviceId 作为消息,其中包含与 Azure 数据资源管理器表架构匹配的键值对:

import "iotc" as iotc;
{
  deviceId: .device.id,
  Timestamp: .enqueuedTime,
  T1: .telemetry | iotc::find(.name == "t1").value,
  T2: .telemetry | iotc::find(.name == "t2").value,
  T3: .telemetry | iotc::find(.name == "t3").value,
}

JSON 输出:

{
  "T1": 1.1889838348731093e+308,
  "T2": 1.4347093391531383e+308,
  "T3": 2.9700885230380616e+307,
  "Timestamp": "1933-01-26T03:10:44.480001324Z",
  "deviceId": "oozrnl1zs857"
}

若要详细了解如何将 Azure 数据资源管理器群集和数据库添加为导出目标,请参阅创建 Azure 数据资源管理器目标

方案 2:分解遥测数组

在此方案中,设备将在一条消息中发送以下遥测数组:

{
  "applicationId": "570c2d7b-d72e-4ad1-aaf4-ad9b727daa47",
  "enqueuedTime": "1909-10-10T07:11:56.078161042Z",
  "messageSource": "telemetry",
  "telemetry": [
    {
      "id": "dtmi:sample1:data;1",
      "name": "data",
      "value": [
        {
          "id": "subdevice1",
          "values": {
              "running": true,
              "cycleCount": 2315
          }
        },
        {
          "id": "subdevice2",
          "values": {
              "running": false,
              "cycleCount": 824567
          }
        }
      ]
    },
    {
      "id": "dtmi:sample1:parentStatus;1",
      "name": "parentStatus",
      "value": "healthy"
    }
  ],
  "device": {
    "id": "9xwhr7khkfri",
    "name": "wireless port",
    "templateId": "dtmi:hpzy1kfcbt2:umua7dplmbd",
    "templateName": "Smart Vitals Patch",
    "properties": {
      "reported": [
        {
          "id": "dtmi:sample1:prop;1",
          "name": "Connectivity",
          "value": "Tenetur ut quasi minus ratione voluptatem."
        }
      ]
    },
    "cloudProperties": [],
    "simulated": true,
    "approved": true,
    "blocked": false,
    "provisioned": false
  }
}

你想要转换此设备数据以匹配以下表架构:

cycleCount deviceId enqueuedTime parentStatus “正在运行” subdeviceId
2315 "9xwhr7khkfri" "1909-10-10T07:11:56.078161042Z" "healthy" "subdevice1"
824567 "9xwhr7khkfri" "1909-10-10T07:11:56.078161042Z" "healthy" false "subdevice2"

以下 JQ 查询为消息中的每个子设备条目创建单独的输出消息,并包括来自基本消息和父设备的一些通用信息。 此查询平展输出,并分离出作为单个消息到达的数据中的逻辑分区:

import "iotc" as iotc;
{
    enqueuedTime: .enqueuedTime,
    deviceId: .device.id,
    parentStatus: .telemetry | iotc::find(.name == "parentStatus").value
} + (
    .telemetry
    | iotc::find(.name == "data").value[]
    | {
        subdeviceId: .id,
        running: .values.running,
        cycleCount: .values.cycleCount
    }
)

JSON 输出:

{
    "cycleCount": 2315,
    "deviceId": "9xwhr7khkfri",
    "enqueuedTime": "1909-10-10T07:11:56.078161042Z",
    "parentStatus": "healthy",
    "running": true,
    "subdeviceId": "subdevice1"
},
{
    "cycleCount": 824567,
    "deviceId": "9xwhr7khkfri",
    "enqueuedTime": "1909-10-10T07:11:56.078161042Z",
    "parentStatus": "healthy",
    "running": false,
    "subdeviceId": "subdevice2"
}

方案 3:Power BI 流式处理

使用 Power BI 实时流式处理功能,可以在仪表板中以较低的延迟查看实时更新的数据。 有关详细信息,请参阅 Power BI 中的实时流式处理

若要将 IoT Central 与 Power BI 流式处理结合使用,请设置以特定格式发送请求正文的 Webhook 导出。 此示例假设你有一个采用以下架构的 Power BI 流式处理数据集:


  {
    "bloodPressureDiastolic": 161438124,
    "bloodPressureSystolic": -966387879,
    "deviceId": "9xwhr7khkfri",
    "deviceName": "wireless port",
    "heartRate": -633994413,
    "heartRateVariability": -37514094,
    "respiratoryRate": 1582211310,
    "timestamp": "1909-10-10T07:11:56.078161042Z"
  }

若要创建 webhook 导出目标,需要 Power BI 流式处理数据集的 REST API URL 终结点。

在此方案中,设备将发送以下示例所示的遥测消息:

{
  "applicationId": "570c2d7b-d72e-4ad1-aaf4-ad9b727daa47",
  "enqueuedTime": "1909-10-10T07:11:56.078161042Z",
  "messageSource": "telemetry",
  "telemetry": [
    {
      "id": "dtmi:smartVitalsPatch:Smart_Vitals_Patch_37p:HeartRate;1",
      "name": "HeartRate",
      "value": -633994413
    },
    {
      "id": "dtmi:smartVitalsPatch:Smart_Vitals_Patch_37p:RespiratoryRate;1",
      "name": "RespiratoryRate",
      "value": 1582211310
    },
    {
      "id": "dtmi:smartVitalsPatch:Smart_Vitals_Patch_37p:HeartRateVariability;1",
      "name": "HeartRateVariability",
      "value": -37514094
    },
    {
      "id": "dtmi:smartVitalsPatch:Smart_Vitals_Patch_37p:BodyTemperature;1",
      "name": "BodyTemperature",
      "value": 5.323322666478241e+307
    },
    {
      "id": "dtmi:smartVitalsPatch:Smart_Vitals_Patch_37p:FallDetection;1",
      "name": "FallDetection",
      "value": "Earum est nobis at voluptas id qui."
    },
    {
      "id": "dtmi:smartVitalsPatch:Smart_Vitals_Patch_37p:BloodPressure;1",
      "name": "BloodPressure",
      "value": {
        "Diastolic": 161438124,
        "Systolic": -966387879
      }
    }
  ],
  "device": {
    "id": "9xwhr7khkfri",
    "name": "wireless port",
    "templateId": "dtmi:hpzy1kfcbt2:umua7dplmbd",
    "templateName": "Smart Vitals Patch",
    "properties": {
      "reported": [
        {
          "id": "dtmi:smartVitalsPatch:Smart_Vitals_Patch_wr:DeviceStatus;1",
          "name": "DeviceStatus",
          "value": "Id optio iste vero et neque sit."
        }
      ]
    },
    "cloudProperties": [],
    "simulated": true,
    "approved": true,
    "blocked": false,
    "provisioned": false
  }
}

以下 JQ 查询将输入消息转换为合适的格式,使 Webhook 可将其发送到 Power BI 流式处理数据集。 此示例包含一个筛选条件,以便仅输出特定设备模板的消息。 可以使用数据导出筛选器功能按设备模板进行筛选:

import "iotc" as iotc;
if .device.templateId == "dtmi:hpzy1kfcbt2:umua7dplmbd" then 
    {
        deviceId: .device.id,
        timestamp: .enqueuedTime,
        deviceName: .device.name,
        bloodPressureSystolic: .telemetry | iotc::find(.name == "BloodPressure").value.Systolic,
        bloodPressureDiastolic: .telemetry | iotc::find(.name == "BloodPressure").value.Diastolic,
        heartRate: .telemetry | iotc::find(.name == "HeartRate").value,
        heartRateVariability: .telemetry | iotc::find(.name == "HeartRateVariability").value,
        respiratoryRate: .telemetry | iotc::find(.name == "RespiratoryRate").value
    }
else
    empty
end

JSON 输出:

{
  "bloodPressureDiastolic": 161438124,
  "bloodPressureSystolic": -966387879,
  "deviceId": "9xwhr7khkfri",
  "deviceName": "wireless port",
  "heartRate": -633994413,
  "heartRateVariability": -37514094,
  "respiratoryRate": 1582211310,
  "timestamp": "1909-10-10T07:11:56.078161042Z"
}

方案 4:将数据导出到 Azure 数据资源管理器并在 Power BI 中可视化

此方案将数据导出到 Azure 数据资源管理器,然后使用连接器在 Power BI 中可视化数据。 若要详细了解如何将 Azure 数据资源管理器群集和数据库添加为导出目标,请参阅创建 Azure 数据资源管理器目标

此方案使用采用以下架构的 Azure 数据资源管理器表:

.create table smartvitalspatch (
  EnqueuedTime:datetime,
  Message:string,
  Application:string,
  Device:string,
  Simulated:boolean,
  Template:string,
  Module:string,
  Component:string,
  Capability:string,
  Value:dynamic
)

在此方案中,设备将发送以下示例所示的遥测消息:

{
    "applicationId": "570c2d7b-d72e-4ad1-aaf4-ad9b727daa47",
    "enqueuedTime": "1909-10-10T07:11:56.078161042Z",
    "messageSource": "telemetry",
    "telemetry": [
        {
            "id": "dtmi:smartVitalsPatch:Smart_Vitals_Patch_37p:HeartRate;1",
            "name": "HeartRate",
            "value": -633994413
        },
        {
            "id": "dtmi:smartVitalsPatch:Smart_Vitals_Patch_37p:RespiratoryRate;1",
            "name": "RespiratoryRate",
            "value": 1582211310
        },
        {
            "id": "dtmi:smartVitalsPatch:Smart_Vitals_Patch_37p:HeartRateVariability;1",
            "name": "HeartRateVariability",
            "value": -37514094
        },
        {
            "id": "dtmi:smartVitalsPatch:Smart_Vitals_Patch_37p:BodyTemperature;1",
            "name": "BodyTemperature",
            "value": 5.323322666478241e+307
        },
        {
            "id": "dtmi:smartVitalsPatch:Smart_Vitals_Patch_37p:FallDetection;1",
            "name": "FallDetection",
            "value": "Earum est nobis at voluptas id qui."
        },
        {
            "id": "dtmi:smartVitalsPatch:Smart_Vitals_Patch_37p:BloodPressure;1",
            "name": "BloodPressure",
            "value": {
                "Diastolic": 161438124,
                "Systolic": -966387879
            }
        }
    ],
    "device": {
        "id": "9xwhr7khkfri",
        "name": "wireless port",
        "templateId": "dtmi:hpzy1kfcbt2:umua7dplmbd",
        "templateName": "Smart Vitals Patch",
        "properties": {
            "reported": [
                {
                    "id": "dtmi:smartVitalsPatch:Smart_Vitals_Patch_wr:DeviceStatus;1",
                    "name": "DeviceStatus",
                    "value": "Id optio iste vero et neque sit."
                }
            ]
        },
        "cloudProperties": [],
        "simulated": true,
        "approved": true,
        "blocked": false,
        "provisioned": false
    }
}

以下 JQ 查询将输入消息转换为针对每个遥测值的单独输出消息。 此项转换生成与 Azure 数据资源管理器表架构匹配的输出。 该转换使用实体-属性-值架构,其中每行包含单个遥测值,遥测的名称是同一行的单独列中的值:

. as $in | .telemetry[] | {
  EnqueuedTime: $in.enqueuedTime,
  Message: $in.messageId,
  Application: $in.applicationId,
  Device: $in.device.id,
  Simulated: $in.device.simulated,
  Template: ($in.device.templateName // ""),
  Module: ($in.module // ""),
  Component: ($in.component // ""),
  Capability: .name,
  Value: .value
}

JSON 输出:

{
  "Application": "570c2d7b-d72e-4ad1-aaf4-ad9b727daa47",
  "Capability": "HeartRate",
  "Component": "",
  "Device": "9xwhr7khkfri",
  "EnqueuedTime": "1909-10-10T07:11:56.078161042Z",
  "Message": null,
  "Module": "",
  "Simulated": true,
  "Template": "Smart Vitals Patch",
  "Value": -633994413
},
{
  "Application": "570c2d7b-d72e-4ad1-aaf4-ad9b727daa47",
  "Capability": "RespiratoryRate",
  "Component": "",
  "Device": "9xwhr7khkfri",
  "EnqueuedTime": "1909-10-10T07:11:56.078161042Z",
  "Message": null,
  "Module": "",
  "Simulated": true,
  "Template": "Smart Vitals Patch",
  "Value": 1582211310
},
{
  "Application": "570c2d7b-d72e-4ad1-aaf4-ad9b727daa47",
  "Capability": "HeartRateVariability",
  "Component": "",
  "Device": "9xwhr7khkfri",
  "EnqueuedTime": "1909-10-10T07:11:56.078161042Z",
  "Message": null,
  "Module": "",
  "Simulated": true,
  "Template": "Smart Vitals Patch",
  "Value": -37514094
},
{
  "Application": "570c2d7b-d72e-4ad1-aaf4-ad9b727daa47",
  "Capability": "BodyTemperature",
  "Component": "",
  "Device": "9xwhr7khkfri",
  "EnqueuedTime": "1909-10-10T07:11:56.078161042Z",
  "Message": null,
  "Module": "",
  "Simulated": true,
  "Template": "Smart Vitals Patch",
  "Value": 5.323322666478241e+307
},
{
  "Application": "570c2d7b-d72e-4ad1-aaf4-ad9b727daa47",
  "Capability": "FallDetection",
  "Component": "",
  "Device": "9xwhr7khkfri",
  "EnqueuedTime": "1909-10-10T07:11:56.078161042Z",
  "Message": null,
  "Module": "",
  "Simulated": true,
  "Template": "Smart Vitals Patch",
  "Value": "Earum est nobis at voluptas id qui."
},
{
  "Application": "570c2d7b-d72e-4ad1-aaf4-ad9b727daa47",
  "Capability": "BloodPressure",
  "Component": "",
  "Device": "9xwhr7khkfri",
  "EnqueuedTime": "1909-10-10T07:11:56.078161042Z",
  "Message": null,
  "Module": "",
  "Simulated": true,
  "Template": "Smart Vitals Patch",
  "Value": {
      "Diastolic": 161438124,
      "Systolic": -966387879
  }
}

输出数据将导出到 Azure 数据资源管理器群集。 若要在 Power BI 中可视化导出的数据,请完成以下步骤:

  1. 安装 Power BI 应用程序。 可以从使用 Power BI Desktop 从数据中找到见解,然后采取行动下载桌面版 Power BI 应用程序。
  2. 从 GitHub 下载 Power BI Desktop IoT Central ADX Connector.pbit 文件。
  3. 使用 Power BI Desktop 应用打开在上一步骤中下载的“IoT Central ADX Connector.pbit”文件。 出现提示时,请输入前面记下的 Azure 数据资源管理器群集、数据库和表信息。

现在,可以在 Power BI 中可视化数据:

Screenshot of Power BI report that shows data from IoT Central.

后续步骤

知道如何在 IoT Central 中转换数据后,建议接下来了解如何在 IoT Central 中使用分析