Split my IoT Device messages (JSON) by Sensor

Christian Sonderstrup 21 Reputation points Microsoft Employee
2021-01-14T21:30:38.417+00:00

I have 5 IoT sensors that measure temperature+humidity. Each sensor has a MAC address and they relay their measurements via Bluetooth to my IoT Gateway (a physical device). I have successfully managed to connect the IoT Gateway to Microsoft Azure IoT Hub, where I received messages (JSON) at a predefined interval. In each message (JSON) are the measurements from my 5 IoT sensors. Each measurement in the JSON is distinguishable by the MAC address (see below). However, in IoT Hub they appear as 1 IoT Device and not 5. As each of the IoT Sensors is connected to the Gateway via Bluetooth, I cannot just create them in IoT Hub as 5 devices, the way I create the gateway.
I clearly want to monitor, aggregate, average, apply AI etc. for each sensor individually, as each sensor pertains to a different room that I am monitoring.

Below is an example of the message/JSON that are received at the IoT Hub (edited to show only 2 sensors)

My question is: How do I best split these messages so that they represent different sensors?
As far as I can think, - there are two approaches.

  1. Split the JSON with a function within IoT Hub (I have no idea on how to do this in practice).
  2. Split the measurements into different messages representing each IoT sensor at the IoT Gateway BEFORE they are sent over the internet to IoT Hub (again, - I have no idea on how to do this in practice).

An informed opinion on which of the two approaches is better is greatly appreciated. So is any indication on how to do it practice.

What I do understand is that if all the sensor readings are lumped into one message/JSON I economize on the message count in IoT Hub. However I then need to pay for more compute to split the message and potentially also for morestorage. I would be happy for the measures to be aggregated/averaged over longer time intervals for each IoT Sensor first, and then sent as separate messages/JSONs at lower frequency to IoT Hub. (Again again I have no idea how to practically do this... but guidance/hints would be welcome).


Example Message/JSON

IoTHubMonitor] [5:45:19 PM] Message received from [ac55gw]:
[
  {
    "timestamp": "2021-01-14T17:44:52.449Z",
    "type": "S1",
    "mac": "AC233FA293E9",
    "bleName": "",
    "rssi": -74,
    "battery": 100,
    "temperature": 23.66,
    "humidity": 42.02
  },
  {
    "timestamp": "2021-01-14T17:44:53.081Z",
    "type": "S1",
    "mac": "AC233FA29290",
    "bleName": "",
    "rssi": -73,
    "battery": 100,
    "temperature": 21.13,
    "humidity": 58.41
  }
]
Azure IoT Edge
Azure IoT Edge
An Azure service that is used to deploy cloud workloads to run on internet of things (IoT) edge devices via standard containers.
598 questions
Azure IoT Hub
Azure IoT Hub
An Azure service that enables bidirectional communication between internet of things (IoT) devices and applications.
1,272 questions
{count} votes

Accepted answer
  1. Sander van de Velde | MVP 36,766 Reputation points MVP Volunteer Moderator
    2021-01-28T14:16:53.08+00:00

    This is a common yet interesting case.

    There is no short answer to this question. There are some 'parameters' missing (like current decisions made or the frequency of ingesting). I will try to give a number of approaches you could consider.

    First, you have an IoT Gateway, connected to the cloud. It seems to be registered as an IoT Device.

    Azure IoT also supports the concept of IoT Edge devices. Here adding custom logic on the edge is easy to send custom messages. It also supports the concept of Identity Translation where multiple IoT device registrations are handled by one edge device.

    Due to the requirements of the IoT Edge runtime (e.g., running Docker containers), this means introducing a new device (e.g., raspberry pi or Industrial pc) between IoT Gateway and Cloud (An IoT Edge Gateway).

    The other solution is splitting the incoming messages AFTER the IoT Hub.

    The IoTHub has functionality for routing messages to different endpoints (other Azure resources) and it can enrich messages with static values (like device tags with make or type or customer).

    Splitting is usually done with:

    Azure Stream Analytics

    An Azure function

    The output of these pieces of logic is then sent to some 'message bus' like a message Event Hub where other services can pick up the flow or some persisted storage is filled with the new messages.

    Azure Function is interesting in the consumption plan with a large number of free messages handled being handled.

    Azure Stream Analytics is very flexible. If you are familiar with TSQL, this is a great way to handle data in flight.

    There is no free tier for Azure Stream Analytics, though.

    So it depends on the number of messages being sent to the cloud.

    Keep in mind the IoT hub ingest is actually measures in chunks. So the message sent can not grow without consequences.

    This can be compensated with inflating messages (are you in control of the logic running on that IoT Gateway?) which is automatically deflated on the IoT hub input (See the event compression type).

    Still, scaling up the IoTHub is likely a cheaper way than adding that extra logic.

    Conclusion: There are more angles to this story, but for now, if you are able to switch to an IoT Edge device with the open-source IoT Edge runtime, you do not need to pay for the logic running in the cloud.

    I recommend checking out the MS Learn training material about Azure IoT (Edge).

    2 people found this answer helpful.
    0 comments No comments

1 additional answer

Sort by: Most helpful
  1. Christian 6 Reputation points
    2021-02-01T23:18:50.2+00:00

    How the solution was practically implemented

    The issue of splitting my incoming JSON messages by MAC-address/sensor was solved using Azure Stream Analytics.

    This YouTube tutorial by Adam Marczak was extremely helpful (https://youtu.be/NbGmyjgY0pU) in explaining how to setup INPUTS and OUTPUTS for the Azure Stream Analytics Job to run.

    The first and bare bones functioning solution was the following SQL-queries in Azure Stream Analytics.
    To help future novices a bit along, - let me explain what this does.
    Ironically the key to splitting is the last section with GROUP BY. This section splits all the messages/objects from within JSON message that reached IoT hub by MAC-address/individual sensor. Which is what I wanted.

    Further, as this is a continuous stream of incoming data, the "GROUP BY TumblingWindow(second, 30)" selects only the last 30 seconds worth of data for processing.

    In my case I have 2-6 measurements/messages per sensor for any moving window of 30 seconds.

    The first section of the SQL: "SELECT", picks the fields/values that I want to use/store. For the Temperature I calculate the Average, Min, Max value. I also selected the MAC, so I can distinguish the measurements later, and inserted a System Timestamp, - which becomes the normalized timestamp across my sensors. I applied the rounding function to eliminate too many decimals.

    The INTO section specifies where I want Stream Analytics output to go. I eventually set i up to into both Blob Storage and PowerBI.

    The FROM section specifies where Stream Analytics can find the incoming stream. See the above linked YouTube for the details on that.

    This worked...


    SELECT 
            mac,
            System.TimeStamp() AS Timestamp,
            round(avg(temperature),2) as AVG_Temperature,
            round(max(temperature),1) as MAX_Temperature,
            round(min(temperature),1) as MIN_Temperature,
            round(avg(humidity),2) as AVG_Humidity,
        INTO
            [My-Streaming-Output]
        FROM
            [My-Streaming-Input]
        GROUP BY
            mac,
            TumblingWindow(second, 30) 
    

    Eventually I added more functionality to it and now run a battery of sequential SQLs and pull in a Refence Input (aka Lookup Table), - so that my MAC-address from the living room can be translated to "Living Room". The following is the full sequence of SQLs as currently implemented.

    SQL 1: Does the same as above but taking in more variables, such a signal strength, battery, and counts the number of measurements in the 30 second window.
    SQL 2: This SQL piece does another GROUP BY on top of SQL 1. I use this to count how many sensors sent data and thus were counted in the previous 30 seconds. One sensor is far from Gateway and the Bluetooth signal does not always reach the Gateway.. which causes the sensor to drop of the stream. I use this query to monitor how many of my sensors I am getting data from.
    SQL 3: Pulls in Reference Input/Lookup Data and joins it to the incoming stream. NOTE - Azure Stream Analytics does not as yet support Preview for Reference Input, so you wont know if it works until you start Running the Stream Analytics Jobs. (That took me 3 hours to learn)

    Please observe that the first three queries are all within a "WITH " function which allows me to reuse the query outputs.

    SQL 4-6 sends the data into the their respective outputs. 1x Blob Storage and 2x PowerBI.

    I am sure I can further refine all of this for elegance, efficiency, resilience and cost, - and I will. But for now I rejoice in the fact that it works. I hope this will help others too.

    SQL 1

    WITH 
        ReaderQuery AS (
        SELECT 
            mac,
            System.TimeStamp() AS Timestamp,
            round(avg(temperature),2) as AVG_Temperature,
            round(max(temperature),1) as MAX_Temperature,
            round(min(temperature),1) as MIN_Temperature,
            round(avg(humidity),2) as AVG_Humidity,
            round(avg(rssi),1) as Signal_Strength,
            count(mac) as Pings,
            min(battery) as Battery
        FROM
            [AC55-Gateway-Stream]
        Group BY
            mac,
            TumblingWindow(second, 30) 
        ),
    

    SQL 2

    Alive as (
        SELECT 
            count(DISTINCT mac) AS Sensors_Online
        FROM
            ReaderQuery
        Group By
            TumblingWindow(second, 30) 
    ),
    

    SQL 3

        MergedData AS (
        SELECT
            Stream_Input.*, 
            Reference_Input.*
        FROM    
            ReaderQuery Stream_Input
        LEFT OUTER JOIN [MAC-Input] Reference_Input ON Stream_Input.mac = Reference_Input.mac
    )
    

    SQL 4

    SELECT
        *
    INTO
        [AC55-Streaming-Blob]
    FROM    
        MergedData
    

    SQL 5

    SELECT
        *
    INTO
        [AC55-Streaming-PowerBI]
    FROM    
        MergedData
    

    SQL 6

    SELECT
        *,
        System.TimeStamp() AS Timestamp
    INTO
        [AC55-Streaming-PowerBi-KPIs]
    FROM    
        Alive
    
    1 person found this answer helpful.
    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.