question

SchneiderPatrick-3072 avatar image
1 Vote"
SchneiderPatrick-3072 asked bhargaviannadevara-msft edited

Python Web App read and display IoT Hub - Data

Hello,

I am using a Python web app and would like to use it to display data from an IoT device. The web app is based on Flask and is supplied with data via the IoT Hub. The integrated built-in endpoint event hub is used for this purpose.

To send the data, I use the Raspberry Pi online simulator and the corresponding Azure tutorial:
https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-raspberry-pi-web-simulator-get-started

I used the Azure tutorial to read the IoT device data sent to the Event Hub:
https://github.com/Azure-Samples/azure-iot-samples-python/blob/master/iot-hub/Quickstarts/read-d2c-messages/read_device_to_cloud_messages_async.py

Code:

 from web import create_app
    
 import asyncio
 from azure.eventhub import TransportType
 from azure.eventhub.aio import EventHubConsumerClient # This is async API. For sync API, remove ".aio" 
    
 async def on_event_batch(partition_context, events):
     for event in events:
         print("Received event from partition: {}.".format(partition_context.partition_id))
         print("Telemetry received: ", event.body_as_str())
         print("Properties (set by device): ", event.properties)
         print("System properties (set by IoT Hub): ", event.system_properties)
         print()
     await partition_context.update_checkpoint()
    
 async def on_error(partition_context, error):
     # Put your code here. partition_context can be None in the on_error callback.
     if partition_context:
         print("An exception: {} occurred during receiving from Partition: {}.".format(
             partition_context.partition_id,
             error
         ))
     else:
         print("An exception: {} occurred during the load balance process.".format(error))
    
 app = create_app()
    
 if __name__ == "__main__":
     ## Connection String
     CONNECTION_STRING = "Endpoint=sb://XXXXXX.servicebus.windows.net/;SharedAccessKeyName=XXXXXX;SharedAccessKey=XXXXX;EntityPath=XXXXXXX"
     CONSUMER_GROUP = "consumergroup-002"
    
     loop = asyncio.get_event_loop()
     client = EventHubConsumerClient.from_connection_string(
         conn_str=CONNECTION_STRING,
         consumer_group=CONSUMER_GROUP,
         # transport_type=TransportType.AmqpOverWebsocket,  # uncomment it if you want to use web socket
         # http_proxy={  # uncomment if you want to use proxy 
         #     'proxy_hostname': '127.0.0.1',  # proxy hostname.
         #     'proxy_port': 3128,  # proxy port.
         #     'username': '<proxy user name>',
         #     'password': '<proxy password>'
         # }
     )
    
     try:
         loop.run_until_complete(client.receive_batch(on_event_batch=on_event_batch, on_error=on_error))
     except KeyboardInterrupt:
         print("Receiving has stopped.")
     finally:
         loop.run_until_complete(client.close())
         loop.stop()
        
     app.run() #debug=True)

Reading the data works without any problems.

Console:
Received event from partition: 0.
Telemetry received: {"messageId":3,"deviceId":"Raspberry Pi Web Client","temperature":29.265454533925492,"humidity":66.26590975230917}
Properties (set by device): {b'temperatureAlert': b'false'}
System properties (set by IoT Hub): {b'iothub-connection-device-id': b'IoTDevice002', b'iothub-connection-auth-method': b'{"scope":"device","type":"sas","issuer":"iothub","acceptingIpFilterRule":null}', b'iothub-connection-auth-generation-id': b'637690248883355300', b'iothub-enqueuedtime': 1633677358912, b'iothub-message-source': b'Telemetry', b'x-opt-sequence-number': 4787, b'x-opt-offset': b'94489779872', b'x-opt-enqueued-time': 1633677359163}

However, the call "loop.run_until_complete(client.receive_batch(on_event_batch=on_event_batch, on_error=on_error))" works blocking.

Thus, the web app is not started at any time. Only after finishing the read operation with the keyboard interrupt the web app starts. However, the execution should happen at the same time (read operation that stores data in variables and web app that uses these variables e.g. to display a graph). Running the web app or the IoT Hub read operation in another thread does not work either.

Is there any way to implement this functionality?

With kind regards
Patrick









azure-iot-hub
· 2
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

Hello,

this is not what I want to implement. I want to read the device-to-cloud messages and not implement the cloud-to-device communication. Reading the device-to-cloud messages in principle is also not the problem, but reading the device-to-cloud messages specifically in the web app.

This tutorial:
https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-live-data-visualization-in-web-apps
is exactly the functionality I want to implement. Only in a Python web app and not in a C# web app.

0 Votes 0 ·

1 Answer

Yunhao avatar image
1 Vote"
Yunhao answered

Hello,

The EventHubConsumerClient.receive_batch is designed to be a blocking call which is based on push mode. So run_until_complete is expected to be blocking the execution in your code.

An easy fix would just be switching the execution order between your web app the eh receiving and run in web app in a thread like the following:


thread = threading.Thread(target=app.run)
thread.start()
loop.run_until_complete(client.receive_batch(on_event_batch=on_event_batch, on_error=on_error))
thread.join()


another option would be moving to use the sync version EventHubConsumerClient and wrap receiving into a thread.

Please let me know whether it could help you out of the situation and any questions you have.

5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.