Kommunicera med din IoT-hubb med hjälp av AMQP-protokollet

Azure IoT Hub stöder OASIS Advanced Message Queuing Protocol (AMQP) version 1.0 för att leverera en mängd olika funktioner via enhetsuppkopplade och tjänstinriktade slutpunkter. Det här dokumentet beskriver användningen av AMQP-klienter för att ansluta till en IoT-hubb för att använda IoT Hub-funktioner.

Tjänstklient

Anslut och autentisera till en IoT-hubb (tjänstklient)

För att ansluta till en IoT-hubb med hjälp av AMQP kan en klient använda anspråksbaserad säkerhet (CBS) eller SASL-autentisering (Simple Authentication and Security Layer).

Följande information krävs för tjänstklienten:

Information Värde
Värdnamn för IoT Hub <iot-hub-name>.azure-devices.net
Nyckelnamn service
Åtkomstnyckel En primär eller sekundär nyckel som är associerad med tjänsten
Signatur för delad åtkomst En signatur för kortlivad delad åtkomst i följande format: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. Information om hur du hämtar koden för att generera den här signaturen finns i Kontrollera åtkomsten till IoT Hub.

Följande kodfragment använder uAMQP-biblioteket i Python för att ansluta till en IoT-hubb via en avsändarlänk.

import uamqp
import urllib
import time

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>'  # example: '/messages/devicebound'

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)

Anropa meddelanden från moln till enhet (tjänstklient)

Mer information om meddelandeutbytet mellan molnet och enheten mellan tjänsten och IoT-hubben och mellan enheten och IoT-hubben finns i Skicka meddelanden från molnet till enheten från din IoT-hubb. Tjänstklienten använder två länkar för att skicka meddelanden och ta emot feedback för tidigare skickade meddelanden från enheter, enligt beskrivningen i följande tabell:

Skapades av Länktyp Länksökväg beskrivning
Tjänst Avsändarlänk /messages/devicebound Meddelanden från moln till enhet som är avsedda för enheter skickas till den här länken av tjänsten. Meddelanden som skickas via den här länken har sin To egenskap inställd på målenhetens mottagarlänksökväg, /devices/<deviceID>/messages/devicebound.
Tjänst Mottagarlänk /messages/serviceBound/feedback Meddelanden om slutförande, avvisande och övergivande av feedback som kommer från enheter som tas emot på den här länken via tjänst. Mer information om feedbackmeddelanden finns i Skicka meddelanden från molnet till enheten från en IoT-hubb.

Följande kodfragment visar hur du skapar ett meddelande från moln till enhet och skickar det till en enhet med hjälp av uAMQP-biblioteket i Python.

import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full'  # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
                    application_properties=app_props)

# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()

# Close the client if it's not needed
send_client.close()

För att få feedback skapar tjänstklienten en mottagarlänk. Följande kodfragment visar hur du skapar en länk med hjälp av uAMQP-biblioteket i Python:

import json

operation = '/messages/serviceBound/feedback'

# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
    print('received a message')
    # Check content_type in message property to identify feedback messages coming from device
    if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
        msg_body_raw = msg.get_data()
        msg_body_str = ''.join(msg_body_raw)
        msg_body = json.loads(msg_body_str)
        print(json.dumps(msg_body, indent=2))
        print('******************')
        for feedback in msg_body:
            print('feedback received')
            print('\tstatusCode: ' + str(feedback['statusCode']))
            print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
            print('\tdeviceId: ' + str(feedback['deviceId']))
            print
    else:
        print('unknown message:', msg.properties.content_type)

Som du ser i föregående kod har ett feedbackmeddelande från moln till enhet en innehållstyp av program/vnd.microsoft.iothub.feedback.json. Du kan använda egenskaperna i meddelandets JSON-brödtext för att härleda leveransstatusen för det ursprungliga meddelandet:

  • Nyckeln statusCode i feedbacktexten har något av följande värden: Success, Expired, DeliveryCountExceeded, Rejected eller Purged.

  • Nyckeln deviceId i feedbacktexten har målenhetens ID.

  • Nyckeln originalMessageId i feedbacktexten har ID för det ursprungliga meddelandet från molnet till enheten som skickades av tjänsten. Du kan använda den här leveransstatusen för att korrelera feedback till meddelanden från moln till enhet.

Ta emot telemetrimeddelanden (tjänstklient)

Som standard lagrar din IoT-hubb inmatade enhetstelemetrimeddelanden i en inbyggd händelsehubb. Tjänstklienten kan använda AMQP-protokollet för att ta emot de lagrade händelserna.

För detta ändamål måste tjänstklienten först ansluta till IoT Hub-slutpunkten och ta emot en omdirigeringsadress till de inbyggda händelsehubbarna. Tjänstklienten använder sedan den angivna adressen för att ansluta till den inbyggda händelsehubben.

I varje steg måste klienten presentera följande information:

  • Giltiga autentiseringsuppgifter för tjänsten (signaturtoken för delad åtkomst för tjänsten).

  • En välformaterad sökväg till den konsumentgrupppartition som den avser att hämta meddelanden från. För en viss konsumentgrupp och partitions-ID har sökvägen följande format: /messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id> (standardkonsumentgruppen är $Default).

  • Ett valfritt filtreringspredikat för att ange en startpunkt i partitionen. Det här predikatet kan vara i form av ett sekvensnummer, förskjutning eller enqueued tidsstämpel.

Följande kodfragment använder uAMQP-biblioteket i Python för att demonstrera föregående steg:

import json
import uamqp
import urllib
import time

# Use the generate_sas_token implementation that's available here: https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
    consumer_group='$Default', p_id=0)

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'

# Helper function to set the filtering predicate on the source URI


def set_endpoint_filter(uri, endpoint_filter=''):
    source_uri = uamqp.address.Source(uri)
    source_uri.set_filter(endpoint_filter)
    return source_uri


receive_client = uamqp.ReceiveClient(
    set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
    batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
    # Once a redirect error is received, close the original client and recreate a new one to the re-directed address
    receive_client.close()

    sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
        redirect.address, policy_name, access_key)
    receive_client = uamqp.ReceiveClient(set_endpoint_filter(
        redirect.address, endpoint_filter), auth=sas_auth, debug=True)

# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
    print('*** received a message ***')
    print(''.join(msg.get_data()))
    print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
    print('\t: ' + str(msg.annotations['x-opt-offset']))
    print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))

För ett visst enhets-ID använder IoT-hubben en hash av enhets-ID:t för att avgöra vilken partition som ska lagras i dess meddelanden. Föregående kodfragment visar hur händelser tas emot från en enda sådan partition. Observera dock att ett typiskt program ofta behöver hämta händelser som lagras i alla händelsehubbpartitioner.

Enhetsklient

Anslut och autentisera till en IoT-hubb (enhetsklient)

Om du vill ansluta till en IoT-hubb med hjälp av AMQP kan en enhet använda anspråksbaserad säkerhetsautentisering (CBS) eller SASL-autentisering (Simple Authentication and Security Layer).

Följande information krävs för enhetsklienten:

Information Värde
Värdnamn för IoT Hub <iot-hub-name>.azure-devices.net
Åtkomstnyckel En primär eller sekundär nyckel som är associerad med enheten
Signatur för delad åtkomst En signatur för kortlivad delad åtkomst i följande format: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. Information om hur du hämtar koden för att generera den här signaturen finns i Kontrollera åtkomsten till IoT Hub.

Följande kodfragment använder uAMQP-biblioteket i Python för att ansluta till en IoT-hubb via en avsändarlänk.

import uamqp
import urllib
import uuid

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
    device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
    hostname=hostname, device_id=device_id), access_key, None)

# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)

Följande länksökvägar stöds som enhetsåtgärder:

Skapades av Länktyp Länksökväg beskrivning
Enheter Mottagarlänk /devices/<deviceID>/messages/devicebound Meddelanden från moln till enhet som är avsedda för enheter tas emot på den här länken av varje målenhet.
Enheter Avsändarlänk /devices/<deviceID>/messages/events Meddelanden från enhet till moln som skickas från en enhet skickas via den här länken.
Enheter Avsändarlänk /messages/serviceBound/feedback Feedback om meddelanden från moln till enhet som skickas till tjänsten via den här länken av enheter.

Ta emot kommandon från moln till enhet (enhetsklient)

Moln-till-enhet-kommandon som skickas till enheter anländer på en /devices/<deviceID>/messages/devicebound länk. Enheter kan ta emot dessa meddelanden i batchar och använda nyttolasten för meddelandedata, meddelandeegenskaper, anteckningar eller programegenskaper i meddelandet efter behov.

Följande kodfragment använder uAMQP-biblioteket i Python) för att ta emot meddelanden från molnet till enheten av en enhet.

# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
    device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
    batch = receive_client.receive_message_batch(max_batch_size=5)
    for msg in batch:
        print('*** received a message ***')
        print(''.join(msg.get_data()))

        # Property 'to' is set to: '/devices/device1/messages/devicebound',
        print('\tto:                     ' + str(msg.properties.to))

        # Property 'message_id' is set to value provided by the service
        print('\tmessage_id:             ' + str(msg.properties.message_id))

        # Other properties are present if they were provided by the service
        print('\tcreation_time:          ' + str(msg.properties.creation_time))
        print('\tcorrelation_id:         ' +
              str(msg.properties.correlation_id))
        print('\tcontent_type:           ' + str(msg.properties.content_type))
        print('\treply_to_group_id:      ' +
              str(msg.properties.reply_to_group_id))
        print('\tsubject:                ' + str(msg.properties.subject))
        print('\tuser_id:                ' + str(msg.properties.user_id))
        print('\tgroup_sequence:         ' +
              str(msg.properties.group_sequence))
        print('\tcontent_encoding:       ' +
              str(msg.properties.content_encoding))
        print('\treply_to:               ' + str(msg.properties.reply_to))
        print('\tabsolute_expiry_time:   ' +
              str(msg.properties.absolute_expiry_time))
        print('\tgroup_id:               ' + str(msg.properties.group_id))

        # Message sequence number in the built-in event hub
        print('\tx-opt-sequence-number:  ' +
              str(msg.annotations['x-opt-sequence-number']))

Skicka telemetrimeddelanden (enhetsklient)

Du kan också skicka telemetrimeddelanden från en enhet med hjälp av AMQP. Enheten kan också ange en ordlista med programegenskaper eller olika meddelandeegenskaper, till exempel meddelande-ID.

Om du vill dirigera meddelanden baserat på meddelandetexten måste du ange content_type egenskapen till application/json;charset=utf-8. Mer information om hur du dirigerar meddelanden antingen baserat på meddelandeegenskaper eller meddelandetext finns i dokumentationen om frågesyntax för IoT Hub-meddelanderoutning.

Följande kodfragment använder uAMQP-biblioteket i Python för att skicka meddelanden från enhet till moln från en enhet.

# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

send_client = uamqp.SendClient(uri, debug=True)

# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None

# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }

# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)

send_client.queue_message(message)
results = send_client.send_all_messages()

for result in results:
    if result == uamqp.constants.MessageState.SendFailed:
        print result

Ytterligare kommentarer

  • AMQP-anslutningarna kan störas på grund av ett nätverksfel eller förfallodatumet för autentiseringstoken (genereras i koden). Tjänstklienten måste hantera dessa omständigheter och återupprätta anslutningen och länkarna om det behövs. Om en autentiseringstoken upphör att gälla kan klienten undvika en anslutningsminskning genom att proaktivt förnya token innan den upphör att gälla.

  • Klienten måste ibland kunna hantera länkomdirigeringar korrekt. Information om en sådan åtgärd finns i din AMQP-klientdokumentation.

Nästa steg

Mer information om AMQP-protokollet finns i AMQP v1.0-specifikationen.

Mer information om IoT Hub-meddelanden finns i: