Komunikace s centrem IoT pomocí protokolu AMQP

Azure IoT Hub podporuje PROTOKOL AMQP (Advanced Message Queuing Protocol) OASIS verze 1.0 , aby poskytoval celou řadu funkcí prostřednictvím koncových bodů směřujících k zařízením a službám. Tento dokument popisuje použití klientů AMQP pro připojení k centru IoT k používání funkcí ioT Hubu.

Klient služby

Připojení a ověření ve službě IoT Hub (klient služby)

Pokud se chcete připojit k centru IoT pomocí AMQP, klient může použít ověřování ZALOŽENÉ na deklaracích identit (CBS) nebo ověřování SASL (Simple Authentication and Security Layer).

Klient služby vyžaduje následující informace:

Informační Hodnota
Název hostitele služby IoT Hub <iot-hub-name>.azure-devices.net
Název klíče service
Přístupový klíč Primární nebo sekundární klíč přidružený ke službě
Sdílený přístupový podpis Krátkodobý sdílený přístupový podpis v následujícím formátu: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. Pokud chcete získat kód pro generování tohoto podpisu, přečtěte si téma Řízení přístupu ke službě IoT Hub.

Následující fragment kódu používá knihovnu uAMQP v Pythonu k připojení k centru IoT prostřednictvím odkazu odesílatele.

import uamqp
import urllib
import time

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>'  # example: '/messages/devicebound'

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)

Vyvolání zpráv z cloudu na zařízení (klient služby)

Další informace o výměně zpráv typu cloud-zařízení mezi službou a centrem IoT a mezi zařízením a centrem IoT najdete v tématu Odesílání zpráv typu cloud-zařízení ze služby IoT Hub. Klient služby používá dva odkazy k odesílání zpráv a přijímání zpětné vazby pro dříve odeslané zprávy ze zařízení, jak je popsáno v následující tabulce:

Vytvořil(a) Typ odkazu Cesta odkazu Popis
Služba Odkaz odesílatele /messages/devicebound Zprávy typu cloud-zařízení určené pro zařízení se odesílají na tento odkaz službou. Zprávy odeslané přes tento odkaz mají vlastnost To nastavena na cestu odkazu příjemce cílového zařízení, /devices/<deviceID>/messages/devicebound.
Služba Odkaz přijímače /messages/serviceBound/feedback Zprávy o ukončení, zamítnutí a opuštění zpětné vazby, které pocházejí ze zařízení přijatých na tomto odkazu službou. Další informace o zprávách zpětné vazby najdete v tématu Odesílání zpráv typu cloud-zařízení z centra IoT.

Následující fragment kódu ukazuje, jak vytvořit zprávu typu cloud-zařízení a odeslat ji do zařízení pomocí knihovny uAMQP v Pythonu.

import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full'  # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
                    application_properties=app_props)

# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()

# Close the client if it's not needed
send_client.close()

Aby klient služby získal zpětnou vazbu, vytvoří odkaz příjemce. Následující fragment kódu ukazuje, jak vytvořit odkaz pomocí knihovny uAMQP v Pythonu:

import json

operation = '/messages/serviceBound/feedback'

# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
    print('received a message')
    # Check content_type in message property to identify feedback messages coming from device
    if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
        msg_body_raw = msg.get_data()
        msg_body_str = ''.join(msg_body_raw)
        msg_body = json.loads(msg_body_str)
        print(json.dumps(msg_body, indent=2))
        print('******************')
        for feedback in msg_body:
            print('feedback received')
            print('\tstatusCode: ' + str(feedback['statusCode']))
            print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
            print('\tdeviceId: ' + str(feedback['deviceId']))
            print
    else:
        print('unknown message:', msg.properties.content_type)

Jak je znázorněno v předchozím kódu, zpráva zpětné vazby typu cloud-zařízení má typ obsahu aplikace/vnd.microsoft.iothub.feedback.json. Pomocí vlastností v textu JSON zprávy můžete odvodit stav doručení původní zprávy:

  • Klíč statusCode v textu zpětné vazby má jednu z následujících hodnot: Success, Expired, DeliveryCountExceeded, Rejected nebo Purged.

  • Klíč deviceId v textu zpětné vazby má ID cílového zařízení.

  • Klíč originalMessageId v textu zpětné vazby má ID původní zprávy typu cloud-zařízení, která byla odeslána službou. Tento stav doručení můžete použít ke korelaci zpětné vazby se zprávami typu cloud-zařízení.

Příjem zpráv telemetrie (klient služby)

Ve výchozím nastavení vaše centrum IoT ukládá ingestované zprávy telemetrie zařízení do integrovaného centra událostí. Klient služby může k příjmu uložených událostí použít protokol AMQP.

Pro tento účel se klient služby musí nejprve připojit ke koncovému bodu služby IoT Hub a přijmout adresu přesměrování do integrovaných center událostí. Klient služby pak použije zadanou adresu pro připojení k integrovanému centru událostí.

V každém kroku musí klient předložit následující informace:

  • Platné přihlašovací údaje služby (token sdíleného přístupového podpisu služby).

  • Dobře naformátovaná cesta k oddílu skupiny příjemců, ze kterého hodlá načíst zprávy. Pro danou skupinu příjemců a ID oddílu má cesta následující formát: /messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id> (výchozí skupina příjemců je $Default).

  • Volitelný predikát filtrování pro určení výchozího bodu v oddílu. Tento predikát může být ve formě pořadového čísla, posunu nebo výčtu časového razítka.

Následující fragment kódu používá knihovnu uAMQP v Pythonu k předvedení předchozích kroků:

import json
import uamqp
import urllib
import time

# Use the generate_sas_token implementation that's available here: https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
    consumer_group='$Default', p_id=0)

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'

# Helper function to set the filtering predicate on the source URI


def set_endpoint_filter(uri, endpoint_filter=''):
    source_uri = uamqp.address.Source(uri)
    source_uri.set_filter(endpoint_filter)
    return source_uri


receive_client = uamqp.ReceiveClient(
    set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
    batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
    # Once a redirect error is received, close the original client and recreate a new one to the re-directed address
    receive_client.close()

    sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
        redirect.address, policy_name, access_key)
    receive_client = uamqp.ReceiveClient(set_endpoint_filter(
        redirect.address, endpoint_filter), auth=sas_auth, debug=True)

# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
    print('*** received a message ***')
    print(''.join(msg.get_data()))
    print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
    print('\t: ' + str(msg.annotations['x-opt-offset']))
    print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))

Pro dané ID zařízení používá Centrum IoT hodnotu hash ID zařízení k určení, do kterého oddílu se mají ukládat zprávy. Předchozí fragment kódu ukazuje, jak se události přijímají z jednoho takového oddílu. Upozorňujeme však, že typická aplikace často potřebuje načíst události, které jsou uložené ve všech oddílech centra událostí.

Klient zařízení

Připojení a ověření ve službě IoT Hub (klient zařízení)

Pokud se chcete připojit k centru IoT pomocí AMQP, může zařízení používat ověřování ZALOŽENÉ na deklaracích identit (CBS) nebo ověřování SASL (Simple Authentication and Security Layer).

Klient zařízení vyžaduje následující informace:

Informační Hodnota
Název hostitele služby IoT Hub <iot-hub-name>.azure-devices.net
Přístupový klíč Primární nebo sekundární klíč přidružený k zařízení
Sdílený přístupový podpis Krátkodobý sdílený přístupový podpis v následujícím formátu: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. Pokud chcete získat kód pro generování tohoto podpisu, přečtěte si téma Řízení přístupu ke službě IoT Hub.

Následující fragment kódu používá knihovnu uAMQP v Pythonu k připojení k centru IoT prostřednictvím odkazu odesílatele.

import uamqp
import urllib
import uuid

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
    device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
    hostname=hostname, device_id=device_id), access_key, None)

# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)

Jako operace zařízení se podporují následující cesty propojení:

Vytvořil(a) Typ odkazu Cesta odkazu Popis
Zařízení Odkaz přijímače /devices/<deviceID>/messages/devicebound Zprávy typu cloud-zařízení určené pro zařízení se na tomto odkazu přijímají jednotlivými cílovými zařízeními.
Zařízení Odkaz odesílatele /devices/<deviceID>/messages/events Zprávy ze zařízení do cloudu, které se odesílají ze zařízení, se posílají přes tento odkaz.
Zařízení Odkaz odesílatele /messages/serviceBound/feedback Zpětná vazba ke zprávě typu cloud-zařízení odeslaná službě prostřednictvím tohoto odkazu zařízeními.

Příjem příkazů cloud-zařízení (klient zařízení)

Příkazy typu cloud-zařízení, které se odesílají do zařízení, přicházejí na /devices/<deviceID>/messages/devicebound odkaz. Zařízení mohou tyto zprávy přijímat v dávkách a podle potřeby používat datovou část dat zprávy, vlastnosti zprávy, poznámky nebo vlastnosti aplikace.

Následující fragment kódu používá knihovnu uAMQP v Pythonu) k příjmu zpráv typu cloud-zařízení zařízením.

# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
    device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
    batch = receive_client.receive_message_batch(max_batch_size=5)
    for msg in batch:
        print('*** received a message ***')
        print(''.join(msg.get_data()))

        # Property 'to' is set to: '/devices/device1/messages/devicebound',
        print('\tto:                     ' + str(msg.properties.to))

        # Property 'message_id' is set to value provided by the service
        print('\tmessage_id:             ' + str(msg.properties.message_id))

        # Other properties are present if they were provided by the service
        print('\tcreation_time:          ' + str(msg.properties.creation_time))
        print('\tcorrelation_id:         ' +
              str(msg.properties.correlation_id))
        print('\tcontent_type:           ' + str(msg.properties.content_type))
        print('\treply_to_group_id:      ' +
              str(msg.properties.reply_to_group_id))
        print('\tsubject:                ' + str(msg.properties.subject))
        print('\tuser_id:                ' + str(msg.properties.user_id))
        print('\tgroup_sequence:         ' +
              str(msg.properties.group_sequence))
        print('\tcontent_encoding:       ' +
              str(msg.properties.content_encoding))
        print('\treply_to:               ' + str(msg.properties.reply_to))
        print('\tabsolute_expiry_time:   ' +
              str(msg.properties.absolute_expiry_time))
        print('\tgroup_id:               ' + str(msg.properties.group_id))

        # Message sequence number in the built-in event hub
        print('\tx-opt-sequence-number:  ' +
              str(msg.annotations['x-opt-sequence-number']))

Odesílání zpráv telemetrie (klient zařízení)

Pomocí AMQP můžete také odesílat telemetrické zprávy ze zařízení. Zařízení může volitelně poskytnout slovník vlastností aplikace nebo různé vlastnosti zprávy, například ID zprávy.

Chcete-li směrovat zprávy na základě textu zprávy, je nutné nastavit content_type vlastnost na application/json;charset=utf-8hodnotu . Další informace o směrování zpráv na základě vlastností zprávy nebo textu zprávy najdete v dokumentaci k syntaxi dotazů směrování zpráv služby IoT Hub.

Následující fragment kódu používá knihovnu uAMQP v Pythonu k odesílání zpráv typu zařízení-cloud ze zařízení.

# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

send_client = uamqp.SendClient(uri, debug=True)

# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None

# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }

# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)

send_client.queue_message(message)
results = send_client.send_all_messages()

for result in results:
    if result == uamqp.constants.MessageState.SendFailed:
        print result

Další poznámky

  • Připojení AMQP můžou být narušena kvůli výpadku sítě nebo vypršení platnosti ověřovacího tokenu (vygenerovaného v kódu). Klient služby musí tyto okolnosti zpracovat a v případě potřeby znovu obnovit připojení a propojení. Pokud vyprší platnost ověřovacího tokenu, klient se může vyhnout poklesu připojení tím, že token proaktivně obnoví před vypršením jeho platnosti.

  • Klient musí občas umět správně zpracovat přesměrování odkazů. Informace o takové operaci najdete v dokumentaci klienta AMQP.

Další kroky

Další informace o protokolu AMQP najdete ve specifikaci AMQP v1.0.

Další informace o zasílání zpráv ve službě IoT Hub najdete tady: