Hi @Athira Gopinath(UST,IN) Greetings! Thank you for posting the question here on community. Since you are trying to To route messages based on message body, you must first add property 'contentType' (ct
) to the end of the MQTT topic and set its value to be application/json;charset=utf-8
as shown in the following example.
devices/{device-id}/messages/events/$.ct=application%2Fjson%3Bcharset%3Dutf-8
The IoT Hub documentation provides a sample of Python code demonstrating how to use MQTT protocol. Please refer the following URL -- Use the MQTT protocol directly. Find the below code I have used to publish the data to an IoT device using Python SDK
from paho.mqtt import client as mqtt
import time
import ssl
import base64
import hmac
import hashlib
import urllib
import paho.mqtt.properties as props
from paho.mqtt.packettypes import PacketTypes
path_to_root_cert = "<local path to digicert.cer>"
device_id = "<Device ID>"
endpoint ="<IoT Hub Name>.azure-devices.net/devices/<Device ID>"
deviceKey ="<Device Primary Key>"
def generateSasToken(resource_uri, signing_key, expires_in_mins):
# Set expiration in seconds
expires = int(time.time() + expires_in_mins * 60)
# Use urllib to encode resourceUri
encoded_resource_uri = urllib.parse.quote(resource_uri, '')
# Construct string to sign from joined encoded resourceUri and expiration
to_sign = encoded_resource_uri + '\n' + str(expires)
# Sign the string with the signing_key
signed_hmac_sha256 = hmac.HMAC(base64.b64decode(signing_key), to_sign.encode('utf-8'), hashlib.sha256)
signature = urllib.parse.quote(base64.b64encode(signed_hmac_sha256.digest()).decode('utf-8'))
# Construct authorization string
return "SharedAccessSignature sr={}&sig={}&se={}".format(encoded_resource_uri, signature, expires)
sas_token = generateSasToken(endpoint, deviceKey, 60)
iot_hub_name = "<IoT Hub Name>"
def on_connect(client, userdata, flags, rc):
print("Device connected with result code: " + str(rc))
def on_disconnect(client, userdata, rc):
print("Device disconnected with result code: " + str(rc))
def on_publish(client, userdata, mid):
print("Device sent message")
client = mqtt.Client(client_id=device_id, protocol=mqtt.MQTTv311)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
client.username_pw_set(username=iot_hub_name+".azure-devices.net/" +
device_id + "/?api-version=2021-04-12", password=sas_token)
client.tls_set(ca_certs=path_to_root_cert, certfile=None, keyfile=None,
cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
client.tls_insecure_set(False)
client.connect(iot_hub_name+".azure-devices.net", port=8883)
final_payload = '{"datetime": "2023-03-30 06:20:30", "RaR": 123, "level": 750, "id": "1234"}'
while True:
print("Publihing data")
#client.publish("testtopic", '{"id":123}', qos=1)
client.publish("devices/" + device_id + "/messages/events/$.ct=application%2Fjson%3Bcharset%3Dutf-8", final_payload, qos=1)
time.sleep(10)
Please make sure to provide correct IoT Hub name, device ID, device Key to the variables device__id, endpoint, deviceKey, iot_hub___nam_e
-
<local path to digicert.cer>
is the path to a local file that contains the DigiCert Baltimore Root certificate. You can create this file by copying the certificate information from certs.c in the Azure IoT SDK for C. Include the lines-----BEGIN CERTIFICATE-----
and-----END CERTIFICATE-----
, remove the"
marks at the beginning and end of every line, and remove the\r\n
characters at the end of every line.
Once you have code running, you could see the value routed to a storage end point has JSON format data. Please find the below image capture from blob storage end point displaying the data in correct format.
Please let us know if you have any additional questions or run into any issues as you implement the code in the comment below. We would be glad to help you further. If the response helped, please do click Accept Answer and Yes. Doing so would help other community members with similar issue identify the solution. I highly appreciate your contribution to the community.