Python Web App read and display IoT Hub - Data

Schneider, Patrick 6 Reputation points
2021-10-08T07:53:51.267+00:00

Hello,

I am using a Python web app and would like to use it to display data from an IoT device. The web app is based on Flask and is supplied with data via the IoT Hub. The integrated built-in endpoint event hub is used for this purpose.

To send the data, I use the Raspberry Pi online simulator and the corresponding Azure tutorial:
https://learn.microsoft.com/en-us/azure/iot-hub/iot-hub-raspberry-pi-web-simulator-get-started

I used the Azure tutorial to read the IoT device data sent to the Event Hub:
https://github.com/Azure-Samples/azure-iot-samples-python/blob/master/iot-hub/Quickstarts/read-d2c-messages/read_device_to_cloud_messages_async.py

Code:

from web import create_app  
  
import asyncio  
from azure.eventhub import TransportType  
from azure.eventhub.aio import EventHubConsumerClient # This is async API. For sync API, remove ".aio"   
  
async def on_event_batch(partition_context, events):  
    for event in events:  
        print("Received event from partition: {}.".format(partition_context.partition_id))  
        print("Telemetry received: ", event.body_as_str())  
        print("Properties (set by device): ", event.properties)  
        print("System properties (set by IoT Hub): ", event.system_properties)  
        print()  
    await partition_context.update_checkpoint()  
  
async def on_error(partition_context, error):  
    # Put your code here. partition_context can be None in the on_error callback.  
    if partition_context:  
        print("An exception: {} occurred during receiving from Partition: {}.".format(  
            partition_context.partition_id,  
            error  
        ))  
    else:  
        print("An exception: {} occurred during the load balance process.".format(error))  
  
app = create_app()  
  
if __name__ == "__main__":  
    ## Connection String  
    CONNECTION_STRING = "Endpoint=sb://XXXXXX.servicebus.windows.net/;SharedAccessKeyName=XXXXXX;SharedAccessKey=XXXXX;EntityPath=XXXXXXX"  
    CONSUMER_GROUP = "consumergroup-002"  
  
    loop = asyncio.get_event_loop()  
    client = EventHubConsumerClient.from_connection_string(  
        conn_str=CONNECTION_STRING,  
        consumer_group=CONSUMER_GROUP,  
        # transport_type=TransportType.AmqpOverWebsocket,  # uncomment it if you want to use web socket  
        # http_proxy={  # uncomment if you want to use proxy   
        #     'proxy_hostname': '127.0.0.1',  # proxy hostname.  
        #     'proxy_port': 3128,  # proxy port.  
        #     'username': '<proxy user name>',  
        #     'password': '<proxy password>'  
        # }  
    )  
  
    try:  
        loop.run_until_complete(client.receive_batch(on_event_batch=on_event_batch, on_error=on_error))  
    except KeyboardInterrupt:  
        print("Receiving has stopped.")  
    finally:  
        loop.run_until_complete(client.close())  
        loop.stop()  
      
    app.run() #debug=True)  

Reading the data works without any problems.

Console:
Received event from partition: 0.
Telemetry received: {"messageId":3,"deviceId":"Raspberry Pi Web Client","temperature":29.265454533925492,"humidity":66.26590975230917}
Properties (set by device): {b'temperatureAlert': b'false'}
System properties (set by IoT Hub): {b'iothub-connection-device-id': b'IoTDevice002', b'iothub-connection-auth-method': b'{"scope":"device","type":"sas","issuer":"iothub","acceptingIpFilterRule":null}', b'iothub-connection-auth-generation-id': b'637690248883355300', b'iothub-enqueuedtime': 1633677358912, b'iothub-message-source': b'Telemetry', b'x-opt-sequence-number': 4787, b'x-opt-offset': b'94489779872', b'x-opt-enqueued-time': 1633677359163}

However, the call "loop.run_until_complete(client.receive_batch(on_event_batch=on_event_batch, on_error=on_error))" works blocking.

Thus, the web app is not started at any time. Only after finishing the read operation with the keyboard interrupt the web app starts. However, the execution should happen at the same time (read operation that stores data in variables and web app that uses these variables e.g. to display a graph). Running the web app or the IoT Hub read operation in another thread does not work either.

Is there any way to implement this functionality?

With kind regards
Patrick

Azure IoT Hub
Azure IoT Hub
An Azure service that enables bidirectional communication between internet of things (IoT) devices and applications.
1,272 questions
{count} vote

1 answer

Sort by: Most helpful
  1. Yunhao 6 Reputation points
    2021-10-14T22:15:05.907+00:00

    Hello,

    The EventHubConsumerClient.receive_batch is designed to be a blocking call which is based on push mode. So run_until_complete is expected to be blocking the execution in your code.

    An easy fix would just be switching the execution order between your web app the eh receiving and run in web app in a thread like the following:

    thread = threading.Thread(target=app.run)
    thread.start()
    loop.run_until_complete(client.receive_batch(on_event_batch=on_event_batch, on_error=on_error))
    thread.join()

    another option would be moving to use the sync version EventHubConsumerClient and wrap receiving into a thread.

    Please let me know whether it could help you out of the situation and any questions you have.

    1 person found this answer helpful.
    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.