Hello,
I am using a Python web app and would like to use it to display data from an IoT device. The web app is based on Flask and is supplied with data via the IoT Hub. The integrated built-in endpoint event hub is used for this purpose.
To send the data, I use the Raspberry Pi online simulator and the corresponding Azure tutorial:
https://learn.microsoft.com/en-us/azure/iot-hub/iot-hub-raspberry-pi-web-simulator-get-started
I used the Azure tutorial to read the IoT device data sent to the Event Hub:
https://github.com/Azure-Samples/azure-iot-samples-python/blob/master/iot-hub/Quickstarts/read-d2c-messages/read_device_to_cloud_messages_async.py
Code:
from web import create_app
import asyncio
from azure.eventhub import TransportType
from azure.eventhub.aio import EventHubConsumerClient # This is async API. For sync API, remove ".aio"
async def on_event_batch(partition_context, events):
for event in events:
print("Received event from partition: {}.".format(partition_context.partition_id))
print("Telemetry received: ", event.body_as_str())
print("Properties (set by device): ", event.properties)
print("System properties (set by IoT Hub): ", event.system_properties)
print()
await partition_context.update_checkpoint()
async def on_error(partition_context, error):
# Put your code here. partition_context can be None in the on_error callback.
if partition_context:
print("An exception: {} occurred during receiving from Partition: {}.".format(
partition_context.partition_id,
error
))
else:
print("An exception: {} occurred during the load balance process.".format(error))
app = create_app()
if __name__ == "__main__":
## Connection String
CONNECTION_STRING = "Endpoint=sb://XXXXXX.servicebus.windows.net/;SharedAccessKeyName=XXXXXX;SharedAccessKey=XXXXX;EntityPath=XXXXXXX"
CONSUMER_GROUP = "consumergroup-002"
loop = asyncio.get_event_loop()
client = EventHubConsumerClient.from_connection_string(
conn_str=CONNECTION_STRING,
consumer_group=CONSUMER_GROUP,
# transport_type=TransportType.AmqpOverWebsocket, # uncomment it if you want to use web socket
# http_proxy={ # uncomment if you want to use proxy
# 'proxy_hostname': '127.0.0.1', # proxy hostname.
# 'proxy_port': 3128, # proxy port.
# 'username': '<proxy user name>',
# 'password': '<proxy password>'
# }
)
try:
loop.run_until_complete(client.receive_batch(on_event_batch=on_event_batch, on_error=on_error))
except KeyboardInterrupt:
print("Receiving has stopped.")
finally:
loop.run_until_complete(client.close())
loop.stop()
app.run() #debug=True)
Reading the data works without any problems.
Console:
Received event from partition: 0.
Telemetry received: {"messageId":3,"deviceId":"Raspberry Pi Web Client","temperature":29.265454533925492,"humidity":66.26590975230917}
Properties (set by device): {b'temperatureAlert': b'false'}
System properties (set by IoT Hub): {b'iothub-connection-device-id': b'IoTDevice002', b'iothub-connection-auth-method': b'{"scope":"device","type":"sas","issuer":"iothub","acceptingIpFilterRule":null}', b'iothub-connection-auth-generation-id': b'637690248883355300', b'iothub-enqueuedtime': 1633677358912, b'iothub-message-source': b'Telemetry', b'x-opt-sequence-number': 4787, b'x-opt-offset': b'94489779872', b'x-opt-enqueued-time': 1633677359163}
However, the call "loop.run_until_complete(client.receive_batch(on_event_batch=on_event_batch, on_error=on_error))" works blocking.
Thus, the web app is not started at any time. Only after finishing the read operation with the keyboard interrupt the web app starts. However, the execution should happen at the same time (read operation that stores data in variables and web app that uses these variables e.g. to display a graph). Running the web app or the IoT Hub read operation in another thread does not work either.
Is there any way to implement this functionality?
With kind regards
Patrick