Duplicate Messages in Azure IoT Hub – Need to Inspect MQTT Message Header

Andrea Foti 20 Punti di reputazione
2025-03-10T10:23:24.7866667+00:00

We are experiencing an issue with duplicate messages when using Azure IoT Hub. Our setup involves an IoT device equipped with a Quectel module, which communicates with Azure IoT Hub via MQTT. We retrieve the messages from IoT Hub using the native Event Hub endpoint.

Our concern is that we are receiving duplicate messages, and we suspect this behavior is related to the MQTT Quality of Service (QoS) level 1, which guarantees message delivery at least once. This could result in multiple deliveries if the cloud acknowledgment (ACK) is lost.

To better understand the root cause, we need to inspect the low-level MQTT message header. Specifically, we are looking for the Duplicate flag in the MQTT control byte, which should be the first byte of the message structure:

  • 4 MSB: Message type (for PUBLISH, this should be 3).
  • 4 LSB: Control flags:
    • Bit 3: Duplicate message flag.
      • Bits 1-2: QoS level.
        • Bit 0: Retain flag.

From our recent tests, we have strong evidence that this issue originates from the MQTT stack implementation in the modem. The most likely scenario is that the modem keeps retransmitting the message until it receives at least one ACK. If multiple messages were already sent, they might still be delivered even after the first ACK is received. The only way to avoid this would be to use QoS 2, which Azure does not seem to support.

To confirm this hypothesis, we need to check the raw MQTT message headers on the cloud side. If we can access this data, we can determine whether the duplicate messages are due to the MQTT QoS 1 behavior. Additionally, the fact that this issue occurs more frequently in areas with poor network coverage further supports our assumption.

Since duplicate messages pose a significant issue for our system and there is no support for QoS 2, we need a way to verify the MQTT headers to analyze the message flow properly. Can you provide us with a way to inspect this data from the Azure IoT Hub side?

Desktop virtuale Azure
Desktop virtuale Azure
Un servizio di virtualizzazione di Microsoft desktop e app eseguito in Azure. Precedentemente noto come Desktop virtuale Windows.
51 domande
{count} voti

Risposta accettata
  1. Sampath 3,750 Punti di reputazione Personale Esterno Microsoft Moderatore
    2025-03-12T11:38:43.21+00:00

    Ciao @Andrea Foti,

    Per leggere gli header di un messaggio MQTT su Azure IoT Hub, puoi seguire questo approccio:

    • Proprietà di sistema: Azure IoT Hub include proprietà di sistema negli header dei messaggi MQTT. Queste proprietà forniscono metadati sul messaggio, come l'ID del dispositivo, l'ID del messaggio e il timestamp. Puoi accedere a queste proprietà durante l'elaborazione del messaggio.
    • Proprietà applicative personalizzate: Puoi definire proprietà applicative personalizzate negli header dei messaggi. Si tratta di coppie chiave-valore che puoi impostare quando invii un messaggio dal tuo dispositivo.
    • Utilizzo degli SDK di Azure IoT: Se utilizzi gli SDK di Azure IoT (disponibili per Python, C#, Java, ecc.), puoi accedere facilmente a queste proprietà. Gli SDK forniscono metodi per recuperare sia le proprietà di sistema che quelle applicative.
    • Struttura dei topic MQTT: Azure IoT Hub codifica le proprietà di sistema e applicative in specifici topic MQTT. Ad esempio, la struttura del topic può includere proprietà come parametri di query.

    Per una guida dettagliata, consulta le seguenti risorse:

    Di seguito un esempio di script Python per interagire con Azure IoT Hub utilizzando MQTT:

    import paho.mqtt.client as mqtt
    
    from time import sleep, time
    
    from ssl import SSLContext, PROTOCOL_TLS_CLIENT, CERT_REQUIRED
    
    import urllib.parse
    
    import hmac
    
    import hashlib
    
    import base64
    
    # Sostituisci con i dettagli del tuo IoT Hub
    
    IOT_HUB_NAME = "{iothub_name}"  # Nome dell'IoT Hub
    
    IOT_HUB_DEVICE_ID = "{device_id}"  # ID del dispositivo
    
    DEVICE_KEY = "{device_primary_key}"  # Chiave simmetrica del dispositivo
    
    # Dettagli connessione MQTT
    
    MQTT_BROKER = f"{IOT_HUB_NAME}.azure-devices.net"
    
    MQTT_PORT = 8883
    
    MQTT_TOPIC_PUB = f"devices/{IOT_HUB_DEVICE_ID}/messages/events/"
    
    MQTT_TOPIC_SUB = f"devices/{IOT_HUB_DEVICE_ID}/messages/devicebound/#"
    
    def generate_sas_token(uri, key, expiry=3600):
    
        """Genera un token SAS per IoT Hub"""
    
        ttl = int(time()) + expiry
    
        sign_key = f"{uri}\n{ttl}".encode("utf-8")
    
        key_bytes = base64.b64decode(key.encode("utf-8"))
    
        signature = base64.b64encode(hmac.new(key_bytes, sign_key, digestmod=hashlib.sha256).digest())
    
        signature = urllib.parse.quote(signature.decode("utf-8"))
    
        return f"SharedAccessSignature sr={uri}&sig={signature}&se={ttl}"
    
    # Genera il token SAS
    
    resource_uri = f"{IOT_HUB_NAME}.azure-devices.net/devices/{IOT_HUB_DEVICE_ID}"
    
    IOT_HUB_SAS_TOKEN = generate_sas_token(resource_uri, DEVICE_KEY)
    
    def on_connect(mqtt_client, obj, flags, rc):
    
        if rc == 0:
    
            print("Connesso ad Azure IoT Hub via MQTT")
    
            mqtt_client.subscribe(MQTT_TOPIC_SUB)
    
        else:
    
            print(f"Connessione fallita con codice di risultato {rc}")
    
    def on_publish(mqtt_client, obj, mid):
    
        print(f"Messaggio pubblicato con ID: {mid}")
    
    def on_message(mqtt_client, obj, msg):
    
        print(f"\nMessaggio C2D ricevuto sul topic: {msg.topic}")
    
        print(f"Payload del messaggio: {msg.payload.decode('utf-8')}")
    
        properties_str = msg.topic.split("devicebound/")[1] if "devicebound/" in msg.topic else ""
    
        properties = dict(urllib.parse.parse_qsl(properties_str.replace("&", "&")))
    
        print(" Header del messaggio estratti (proprietà di sistema):", properties)
    
    mqtt_client = mqtt.Client(client_id=IOT_HUB_DEVICE_ID, protocol=mqtt.MQTTv311)
    
    mqtt_client.on_connect = on_connect
    
    mqtt_client.on_publish = on_publish
    
    mqtt_client.on_message = on_message
    
    mqtt_client.username_pw_set(
    
        username=f"{IOT_HUB_NAME}.azure-devices.net/{IOT_HUB_DEVICE_ID}/?api-version=2021-04-12",
    
        password=IOT_HUB_SAS_TOKEN
    
    )
    
    ssl_context = SSLContext(protocol=PROTOCOL_TLS_CLIENT)
    
    ssl_context.load_default_certs()
    
    ssl_context.verify_mode = CERT_REQUIRED
    
    ssl_context.check_hostname = True
    
    mqtt_client.tls_set_context(context=ssl_context)
    
    mqtt_client.connect(MQTT_BROKER, MQTT_PORT, keepalive=120)
    
    mqtt_client.loop_start()
    
    messages = [
    
        {"temperature": 22.5, "humidity": 60},
    
        {"temperature": 23.1, "humidity": 58},
    
        {"temperature": 21.9, "humidity": 62}
    
    ]
    
    for i, telemetry in enumerate(messages):
    
        properties = "iothub-creation-time-utc=2025-03-12T12:34:56Z&dt-dataschema=urn:modelDefinition:12345"
    
        topic_with_properties = f"{MQTT_TOPIC_PUB}{properties}"
    
        print(f"Invio di telemetry[{i}]: {telemetry} con header: {properties}")
    
        mqtt_client.publish(topic_with_properties, payload=str(telemetry), qos=1)
    
        sleep(2)
    
    try:
    
        while True:
    
            sleep(5)
    
    except KeyboardInterrupt:
    
        print("\nArresto del client MQTT...")
    
        mqtt_client.loop_stop()
    
        mqtt_client.disconnect()
    

    Di seguito è riportato l'output di esempio:MQTT Headers Screenshot

    Uno dei possibili motivi per la ricezione di messaggi duplicati è la garanzia di consegna "almeno una volta" (At-Least-Once Delivery) offerta da IoT Hub. Questo meccanismo assicura che i messaggi vengano consegnati almeno una volta, ma può portare occasionalmente alla ricezione di duplicati.

    Un'altra causa comune riguarda Edge Hub, che potrebbe ritrasmettere un messaggio se non riceve un'adeguata conferma di avvenuta ricezione da parte di IoT Hub. Inoltre, problemi di rete o connessioni instabili potrebbero indurre IoT Hub a considerare un messaggio come non inviato e quindi ritrasmetterlo.

    Anche Event Grid può contribuire alla duplicazione dei messaggi: il trigger di Event Grid potrebbe attivarsi più volte a causa di meccanismi di ritentativo o per via della duplicazione dei messaggi. Inoltre, l'esecuzione di Azure Functions può essere un fattore determinante: se una funzione non riesce a elaborare un messaggio entro il tempo previsto, il messaggio potrebbe essere riprocessato. È anche importante verificare le regole di routing dei messaggi in IoT Hub per assicurarsi che non siano configurate in modo da inviare lo stesso messaggio più di una volta.

    Per gestire i messaggi duplicati, è consigliabile impostare un ID univoco per ogni messaggio utilizzando la proprietà MessageId quando si inviano messaggi dal modulo IoT Edge. Questo aiuta il destinatario a identificare ed eliminare eventuali duplicati. Inoltre, è possibile implementare una strategia di deduplicazione nel cloud, ad esempio memorizzando gli ID dei messaggi ricevuti in Cosmos DB o in un altro sistema di archiviazione e scartando i duplicati. Un altro metodo utile è abilitare la registrazione locale su Edge Hub per verificare se i messaggi duplicati provengono effettivamente dall’Edge Hub o se la duplicazione avviene in IoT Hub.

    Per il debug del problema, è consigliabile controllare i log del modulo Edge per confermare che venga inviato un solo messaggio. È possibile abilitare il monitoraggio di IoT Hub con il comando az iot hub monitor-events per tracciare i messaggi in tempo reale. Infine, è utile ispezionare le regole di routing dei messaggi in IoT Hub per assicurarsi che non vi siano configurazioni che causano consegne duplicate.

    Sto traducendo le risposte dall'inglese, quindi mi scuso per eventuali errori grammaticali.

    Spero che questo ti sia d’aiuto!

    Se la risposta ti è stata utile, fai clic su Accetta risposta e lascia un voto positivo facendo clic su .

    accetta

    Se hai altre domande, fai clic su Commenta.

    1 persona ha trovato utile questa risposta.
    0 commenti Nessun commento

0 risposte aggiuntive

Ordina per: Più utili

Risposta

Le risposte possono essere contrassegnate come risposte accettate dall'autore della domanda. Ciò consente agli utenti di sapere che la risposta ha risolto il problema dell'autore.