Kommunikáció az IoT Hubbal az AMQP protokoll használatával

Az Azure IoT Hub támogatja az OASIS Advanced Message Queuing Protocol (AMQP) 1.0-s verzióját, hogy különféle funkciókat biztosítson az eszköz- és szolgáltatásoldali végpontokon keresztül. Ez a dokumentum azt ismerteti, hogy az AMQP-ügyfelek hogyan csatlakozhatnak IoT Hubhoz az IoT Hub funkcióinak használatához.

Szolgáltatásügyfél

IoT Hub (szolgáltatásügyfél) Csatlakozás és hitelesítése

Ha az AMQP használatával szeretne csatlakozni egy IoT Hubhoz, az ügyfél használhatja a jogcímalapú biztonsági (CBS) vagy az egyszerű hitelesítési és biztonsági réteg (SASL) hitelesítést.

A szolgáltatásügyfélhöz a következő információk szükségesek:

Tájékoztatás Érték
IoT Hub gazdagépneve <iot-hub-name>.azure-devices.net
Kulcs neve service
Hozzáférési kulcs A szolgáltatáshoz társított elsődleges vagy másodlagos kulcs
Közös hozzáférésű jogosultságkód Rövid élettartamú közös hozzáférésű jogosultságkód a következő formátumban: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. Az aláírás létrehozásához szükséges kód lekéréséhez tekintse meg az IoT Hubhoz való hozzáférés szabályozása című témakört.

Az alábbi kódrészlet a Python uAMQP-kódtára használatával csatlakozik egy IoT Hubhoz egy feladó hivatkozásán keresztül.

import uamqp
import urllib
import time

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>'  # example: '/messages/devicebound'

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)

Felhőből eszközre irányuló üzenetek meghívása (szolgáltatásügyfél)

A szolgáltatás és az IoT Hub, valamint az eszköz és az IoT Hub közötti felhőalapú üzenetcseréről további információt az IoT Hubról felhőből eszközre irányuló üzenetek küldése című témakörben talál. A szolgáltatásügyfél két hivatkozással küld üzeneteket, és visszajelzést kap az eszközökről korábban küldött üzenetekről, az alábbi táblázatban leírtak szerint:

Létrehozó Hivatkozás típusa Elérési út csatolása Leírás
Szolgáltatás Feladó hivatkozása /messages/devicebound Az eszközökre szánt felhőalapú üzeneteket a szolgáltatás erre a hivatkozásra küldi. Az ezen a hivatkozáson keresztül küldött üzenetek tulajdonsága To a céleszköz fogadókapcsolati elérési útjára van beállítva. /devices/<deviceID>/messages/devicebound
Szolgáltatás Fogadó hivatkozása /messages/serviceBound/feedback Befejező, elutasító és megszüntető visszajelzési üzenetek, amelyek a szolgáltatás által ezen a hivatkozáson kapott eszközökről érkeznek. A visszajelzési üzenetekről további információt az IoT Hubról felhőből eszközre irányuló üzenetek küldése című témakörben talál.

Az alábbi kódrészlet bemutatja, hogyan hozhat létre felhőből eszközre irányuló üzenetet, és hogyan küldheti el egy eszközre az uAMQP-kódtár használatával a Pythonban.

import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full'  # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
                    application_properties=app_props)

# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()

# Close the client if it's not needed
send_client.close()

Visszajelzés fogadásához a szolgáltatásügyfél létrehoz egy fogadóhivatkozást. Az alábbi kódrészlet bemutatja, hogyan hozhat létre hivatkozást az uAMQP-kódtár használatával a Pythonban:

import json

operation = '/messages/serviceBound/feedback'

# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
    print('received a message')
    # Check content_type in message property to identify feedback messages coming from device
    if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
        msg_body_raw = msg.get_data()
        msg_body_str = ''.join(msg_body_raw)
        msg_body = json.loads(msg_body_str)
        print(json.dumps(msg_body, indent=2))
        print('******************')
        for feedback in msg_body:
            print('feedback received')
            print('\tstatusCode: ' + str(feedback['statusCode']))
            print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
            print('\tdeviceId: ' + str(feedback['deviceId']))
            print
    else:
        print('unknown message:', msg.properties.content_type)

Ahogy az előző kódban is látható, a felhőből az eszközre irányuló visszajelzési üzenet tartalomtípussal rendelkezik az alkalmazás/vnd.microsoft.iothub.feedback.json. Az üzenet JSON-törzsének tulajdonságaival következtethet az eredeti üzenet kézbesítési állapotára:

  • A visszajelzési törzsben található kulcs statusCode a következő értékek egyikével rendelkezik: Success, Expired, DeliveryCountExceeded, Rejected vagy Purged.

  • A visszajelzési törzsben található kulcs deviceId a céleszköz azonosítójával rendelkezik.

  • A originalMessageId visszajelzési törzs kulcsának azonosítója a szolgáltatás által küldött eredeti, felhőből eszközre irányuló üzenet. Ezzel a kézbesítési állapottal korrelálhatja a visszajelzéseket a felhőből az eszközre irányuló üzenetekhez.

Telemetriai üzenetek fogadása (szolgáltatásügyfél)

Az IoT Hub alapértelmezés szerint egy beépített eseményközpontban tárolja a betöltött eszköz telemetriai üzeneteit. A szolgáltatásügyfél az AMQP protokoll használatával fogadhatja a tárolt eseményeket.

Ehhez a szolgáltatásügyfélnek először csatlakoznia kell az IoT Hub végponthoz, és átirányítási címet kell kapnia a beépített eseményközpontokhoz. A szolgáltatásügyfél ezután a megadott címmel csatlakozik a beépített eseményközponthoz.

Az ügyfélnek minden lépésben a következő információkat kell bemutatnia:

  • Érvényes szolgáltatás hitelesítő adatai (szolgáltatás közös hozzáférésű jogosultságkód jogkivonata).

  • Egy jól formázott elérési út ahhoz a fogyasztóicsoport-partícióhoz, amelyből üzeneteket kíván lekérni. Egy adott fogyasztói csoport és partícióazonosító esetében az elérési út formátuma a következő: /messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id> (az alapértelmezett fogyasztói csoport).$Default

  • Választható szűrési predikátum, amely kijelöl egy kezdőpontot a partícióban. Ez a predikátum lehet sorszám, eltolás vagy lekérdezett időbélyeg formájában.

Az alábbi kódrészlet a Python uAMQP-kódtárának használatával mutatja be az előző lépéseket:

import json
import uamqp
import urllib
import time

# Use the generate_sas_token implementation that's available here: https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
    consumer_group='$Default', p_id=0)

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'

# Helper function to set the filtering predicate on the source URI


def set_endpoint_filter(uri, endpoint_filter=''):
    source_uri = uamqp.address.Source(uri)
    source_uri.set_filter(endpoint_filter)
    return source_uri


receive_client = uamqp.ReceiveClient(
    set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
    batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
    # Once a redirect error is received, close the original client and recreate a new one to the re-directed address
    receive_client.close()

    sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
        redirect.address, policy_name, access_key)
    receive_client = uamqp.ReceiveClient(set_endpoint_filter(
        redirect.address, endpoint_filter), auth=sas_auth, debug=True)

# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
    print('*** received a message ***')
    print(''.join(msg.get_data()))
    print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
    print('\t: ' + str(msg.annotations['x-opt-offset']))
    print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))

Egy adott eszközazonosító esetében az IoT Hub az eszközazonosító kivonatával határozza meg, hogy melyik partíción tárolja az üzeneteket. Az előző kódrészlet bemutatja, hogyan érkeznek események egyetlen ilyen partícióról. Vegye figyelembe azonban, hogy egy tipikus alkalmazásnak gyakran le kell kérnie az összes eseményközpont partíciójában tárolt eseményeket.

Eszközügyfél

IoT Hub (eszközügyfél) Csatlakozás és hitelesítése

Ha AMQP használatával szeretne csatlakozni egy IoT Hubhoz, az eszköz jogcímalapú biztonsági (CBS) vagy egyszerű hitelesítési és biztonsági rétegbeli (SASL) hitelesítést használhat.

Az eszközügyfélhöz a következő információk szükségesek:

Tájékoztatás Érték
IoT Hub gazdagépneve <iot-hub-name>.azure-devices.net
Hozzáférési kulcs Az eszközhöz társított elsődleges vagy másodlagos kulcs
Közös hozzáférésű jogosultságkód Rövid élettartamú közös hozzáférésű jogosultságkód a következő formátumban: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. Az aláírás létrehozásához szükséges kód lekéréséhez tekintse meg az IoT Hubhoz való hozzáférés szabályozása című témakört.

Az alábbi kódrészlet a Python uAMQP-kódtára használatával csatlakozik egy IoT Hubhoz egy feladó hivatkozásán keresztül.

import uamqp
import urllib
import uuid

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
    device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
    hostname=hostname, device_id=device_id), access_key, None)

# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)

Eszközműveletként a következő hivatkozási útvonalak támogatottak:

Létrehozó Hivatkozás típusa Elérési út csatolása Leírás
Eszközök Fogadó hivatkozása /devices/<deviceID>/messages/devicebound Az eszközökre szánt felhőalapú üzeneteket ezen a hivatkozáson minden céleszköz fogadja.
Eszközök Feladó hivatkozása /devices/<deviceID>/messages/events Az eszközről küldött eszközről a felhőbe irányuló üzenetek ezen a hivatkozáson keresztül lesznek elküldve.
Eszközök Feladó hivatkozása /messages/serviceBound/feedback Az eszközök által ezen a hivatkozáson keresztül küldött, a szolgáltatásnak küldött, felhőből eszközre irányuló üzenetekre vonatkozó visszajelzések.

Felhő–eszköz parancsok fogadása (eszközügyfél)

Az eszközöknek küldött felhőalapú parancsok egy /devices/<deviceID>/messages/devicebound hivatkozásra érkeznek. Az eszközök kötegekben fogadhatják ezeket az üzeneteket, és szükség szerint használhatják az üzenet adatadatait, üzenettulajdonságait, széljegyzeteit vagy alkalmazástulajdonságait.

Az alábbi kódrészlet a Python uAMQP-kódtárának használatával fogadja a felhőből az eszközre irányuló üzeneteket.

# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
    device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
    batch = receive_client.receive_message_batch(max_batch_size=5)
    for msg in batch:
        print('*** received a message ***')
        print(''.join(msg.get_data()))

        # Property 'to' is set to: '/devices/device1/messages/devicebound',
        print('\tto:                     ' + str(msg.properties.to))

        # Property 'message_id' is set to value provided by the service
        print('\tmessage_id:             ' + str(msg.properties.message_id))

        # Other properties are present if they were provided by the service
        print('\tcreation_time:          ' + str(msg.properties.creation_time))
        print('\tcorrelation_id:         ' +
              str(msg.properties.correlation_id))
        print('\tcontent_type:           ' + str(msg.properties.content_type))
        print('\treply_to_group_id:      ' +
              str(msg.properties.reply_to_group_id))
        print('\tsubject:                ' + str(msg.properties.subject))
        print('\tuser_id:                ' + str(msg.properties.user_id))
        print('\tgroup_sequence:         ' +
              str(msg.properties.group_sequence))
        print('\tcontent_encoding:       ' +
              str(msg.properties.content_encoding))
        print('\treply_to:               ' + str(msg.properties.reply_to))
        print('\tabsolute_expiry_time:   ' +
              str(msg.properties.absolute_expiry_time))
        print('\tgroup_id:               ' + str(msg.properties.group_id))

        # Message sequence number in the built-in event hub
        print('\tx-opt-sequence-number:  ' +
              str(msg.annotations['x-opt-sequence-number']))

Telemetriai üzenetek küldése (eszközügyfél)

Telemetriai üzeneteket is küldhet egy eszközről az AMQP használatával. Az eszköz opcionálisan biztosíthatja az alkalmazástulajdonságok szótárát, vagy különböző üzenettulajdonságokat, például üzenetazonosítót.

Ha üzeneteket szeretne átirányítani az üzenettörzs alapján, a tulajdonságot application/json;charset=utf-8be kell állítaniacontent_type. Ha többet szeretne megtudni az üzenetek üzenettulajdonságok vagy üzenettörzs alapján történő útválasztásáról, tekintse meg az IoT Hub üzenet-útválasztási lekérdezés szintaxisának dokumentációját.

Az alábbi kódrészlet a Python uAMQP-kódtára használatával küld eszközről felhőbe irányuló üzeneteket egy eszközről.

# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

send_client = uamqp.SendClient(uri, debug=True)

# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None

# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }

# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)

send_client.queue_message(message)
results = send_client.send_all_messages()

for result in results:
    if result == uamqp.constants.MessageState.SendFailed:
        print result

További megjegyzések

  • Az AMQP-kapcsolatok hálózati hiba vagy a hitelesítési jogkivonat (a kódban generált) lejárata miatt megszakadhatnak. A szolgáltatásügyfélnek kezelnie kell ezeket a körülményeket, és szükség esetén újra létre kell tennie a kapcsolatot és a kapcsolatokat. Ha egy hitelesítési jogkivonat lejár, az ügyfél elkerülheti a kapcsolat megszakadását, ha proaktívan megújítja a jogkivonatot a lejárat előtt.

  • Az ügyfélnek időnként képesnek kell lennie a hivatkozásátirányítások helyes kezelésére. Egy ilyen művelet megismeréséhez tekintse meg az AMQP-ügyfél dokumentációját.

Következő lépések

Az AMQP Protokollról az AMQP 1.0-s verzióra vonatkozó specifikációban talál további információt.

Az IoT Hub-üzenetkezeléssel kapcsolatos további információkért lásd: