Ciao @Andrea Foti,
Per leggere gli header di un messaggio MQTT su Azure IoT Hub, puoi seguire questo approccio:
- Proprietà di sistema: Azure IoT Hub include proprietà di sistema negli header dei messaggi MQTT. Queste proprietà forniscono metadati sul messaggio, come l'ID del dispositivo, l'ID del messaggio e il timestamp. Puoi accedere a queste proprietà durante l'elaborazione del messaggio.
- Proprietà applicative personalizzate: Puoi definire proprietà applicative personalizzate negli header dei messaggi. Si tratta di coppie chiave-valore che puoi impostare quando invii un messaggio dal tuo dispositivo.
- Utilizzo degli SDK di Azure IoT: Se utilizzi gli SDK di Azure IoT (disponibili per Python, C#, Java, ecc.), puoi accedere facilmente a queste proprietà. Gli SDK forniscono metodi per recuperare sia le proprietà di sistema che quelle applicative.
- Struttura dei topic MQTT: Azure IoT Hub codifica le proprietà di sistema e applicative in specifici topic MQTT. Ad esempio, la struttura del topic può includere proprietà come parametri di query.
Per una guida dettagliata, consulta le seguenti risorse:
Di seguito un esempio di script Python per interagire con Azure IoT Hub utilizzando MQTT:
import paho.mqtt.client as mqtt
from time import sleep, time
from ssl import SSLContext, PROTOCOL_TLS_CLIENT, CERT_REQUIRED
import urllib.parse
import hmac
import hashlib
import base64
# Sostituisci con i dettagli del tuo IoT Hub
IOT_HUB_NAME = "{iothub_name}" # Nome dell'IoT Hub
IOT_HUB_DEVICE_ID = "{device_id}" # ID del dispositivo
DEVICE_KEY = "{device_primary_key}" # Chiave simmetrica del dispositivo
# Dettagli connessione MQTT
MQTT_BROKER = f"{IOT_HUB_NAME}.azure-devices.net"
MQTT_PORT = 8883
MQTT_TOPIC_PUB = f"devices/{IOT_HUB_DEVICE_ID}/messages/events/"
MQTT_TOPIC_SUB = f"devices/{IOT_HUB_DEVICE_ID}/messages/devicebound/#"
def generate_sas_token(uri, key, expiry=3600):
"""Genera un token SAS per IoT Hub"""
ttl = int(time()) + expiry
sign_key = f"{uri}\n{ttl}".encode("utf-8")
key_bytes = base64.b64decode(key.encode("utf-8"))
signature = base64.b64encode(hmac.new(key_bytes, sign_key, digestmod=hashlib.sha256).digest())
signature = urllib.parse.quote(signature.decode("utf-8"))
return f"SharedAccessSignature sr={uri}&sig={signature}&se={ttl}"
# Genera il token SAS
resource_uri = f"{IOT_HUB_NAME}.azure-devices.net/devices/{IOT_HUB_DEVICE_ID}"
IOT_HUB_SAS_TOKEN = generate_sas_token(resource_uri, DEVICE_KEY)
def on_connect(mqtt_client, obj, flags, rc):
if rc == 0:
print("Connesso ad Azure IoT Hub via MQTT")
mqtt_client.subscribe(MQTT_TOPIC_SUB)
else:
print(f"Connessione fallita con codice di risultato {rc}")
def on_publish(mqtt_client, obj, mid):
print(f"Messaggio pubblicato con ID: {mid}")
def on_message(mqtt_client, obj, msg):
print(f"\nMessaggio C2D ricevuto sul topic: {msg.topic}")
print(f"Payload del messaggio: {msg.payload.decode('utf-8')}")
properties_str = msg.topic.split("devicebound/")[1] if "devicebound/" in msg.topic else ""
properties = dict(urllib.parse.parse_qsl(properties_str.replace("&", "&")))
print(" Header del messaggio estratti (proprietà di sistema):", properties)
mqtt_client = mqtt.Client(client_id=IOT_HUB_DEVICE_ID, protocol=mqtt.MQTTv311)
mqtt_client.on_connect = on_connect
mqtt_client.on_publish = on_publish
mqtt_client.on_message = on_message
mqtt_client.username_pw_set(
username=f"{IOT_HUB_NAME}.azure-devices.net/{IOT_HUB_DEVICE_ID}/?api-version=2021-04-12",
password=IOT_HUB_SAS_TOKEN
)
ssl_context = SSLContext(protocol=PROTOCOL_TLS_CLIENT)
ssl_context.load_default_certs()
ssl_context.verify_mode = CERT_REQUIRED
ssl_context.check_hostname = True
mqtt_client.tls_set_context(context=ssl_context)
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, keepalive=120)
mqtt_client.loop_start()
messages = [
{"temperature": 22.5, "humidity": 60},
{"temperature": 23.1, "humidity": 58},
{"temperature": 21.9, "humidity": 62}
]
for i, telemetry in enumerate(messages):
properties = "iothub-creation-time-utc=2025-03-12T12:34:56Z&dt-dataschema=urn:modelDefinition:12345"
topic_with_properties = f"{MQTT_TOPIC_PUB}{properties}"
print(f"Invio di telemetry[{i}]: {telemetry} con header: {properties}")
mqtt_client.publish(topic_with_properties, payload=str(telemetry), qos=1)
sleep(2)
try:
while True:
sleep(5)
except KeyboardInterrupt:
print("\nArresto del client MQTT...")
mqtt_client.loop_stop()
mqtt_client.disconnect()
Di seguito è riportato l'output di esempio:
Uno dei possibili motivi per la ricezione di messaggi duplicati è la garanzia di consegna "almeno una volta" (At-Least-Once Delivery) offerta da IoT Hub. Questo meccanismo assicura che i messaggi vengano consegnati almeno una volta, ma può portare occasionalmente alla ricezione di duplicati.
Un'altra causa comune riguarda Edge Hub, che potrebbe ritrasmettere un messaggio se non riceve un'adeguata conferma di avvenuta ricezione da parte di IoT Hub. Inoltre, problemi di rete o connessioni instabili potrebbero indurre IoT Hub a considerare un messaggio come non inviato e quindi ritrasmetterlo.
Anche Event Grid può contribuire alla duplicazione dei messaggi: il trigger di Event Grid potrebbe attivarsi più volte a causa di meccanismi di ritentativo o per via della duplicazione dei messaggi. Inoltre, l'esecuzione di Azure Functions può essere un fattore determinante: se una funzione non riesce a elaborare un messaggio entro il tempo previsto, il messaggio potrebbe essere riprocessato. È anche importante verificare le regole di routing dei messaggi in IoT Hub per assicurarsi che non siano configurate in modo da inviare lo stesso messaggio più di una volta.
Per gestire i messaggi duplicati, è consigliabile impostare un ID univoco per ogni messaggio utilizzando la proprietà MessageId quando si inviano messaggi dal modulo IoT Edge. Questo aiuta il destinatario a identificare ed eliminare eventuali duplicati. Inoltre, è possibile implementare una strategia di deduplicazione nel cloud, ad esempio memorizzando gli ID dei messaggi ricevuti in Cosmos DB o in un altro sistema di archiviazione e scartando i duplicati. Un altro metodo utile è abilitare la registrazione locale su Edge Hub per verificare se i messaggi duplicati provengono effettivamente dall’Edge Hub o se la duplicazione avviene in IoT Hub.
Per il debug del problema, è consigliabile controllare i log del modulo Edge per confermare che venga inviato un solo messaggio. È possibile abilitare il monitoraggio di IoT Hub con il comando az iot hub monitor-events
per tracciare i messaggi in tempo reale. Infine, è utile ispezionare le regole di routing dei messaggi in IoT Hub per assicurarsi che non vi siano configurazioni che causano consegne duplicate.
Sto traducendo le risposte dall'inglese, quindi mi scuso per eventuali errori grammaticali.
Spero che questo ti sia d’aiuto!
Se la risposta ti è stata utile, fai clic su Accetta risposta e lascia un voto positivo facendo clic su Sì.
Se hai altre domande, fai clic su Commenta.