使用 AMQP 通訊協定來與 IoT 中樞進行通訊

Azure IoT 中樞支援 OASIS 進階訊息佇列通訊協定 (AMQP) 1.0 版,透過裝置對應和服務對應端點提供各種功能。 本文件描述使用 AMQP 用戶端連線到 IoT 中樞以使用 IoT 中樞功能。

服務用戶端

連線並驗證 IoT 中樞 (服務用戶端)

若要使用 AMQP 連線到 IoT 中樞,用戶端可以使用宣告式安全性 (CBS)簡單驗證及安全性階層 (SASL) 驗證

服務用戶端需要下列資訊:

資訊
IoT 中樞主機名稱 <iot-hub-name>.azure-devices.net
索引鍵名稱 service
存取金鑰 與服務相關聯的主要或次要金鑰
共用存取簽章 短期的共用存取簽章,格式如下︰SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}。 若要取得產生此簽章的程式碼,請參閱控制 IoT 中樞的存取權

下列程式碼片段使用 Python 中的 uAMQP 程式庫,透過傳送者連結連線到 IoT 中樞。

import uamqp
import urllib
import time

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>'  # example: '/messages/devicebound'

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)

叫用雲端到裝置的訊息 (服務用戶端)

若要了解服務和 IoT 中樞之間,以及裝置和 IoT 中樞之間的雲端到裝置訊息交換,請參閱 從 IoT 中樞傳送雲端到裝置訊息。 服務用戶端會使用兩個連結來傳送訊息,並接收先前從裝置傳送訊息的意見反應,如下表所述:

建立者: 連結類型 連結路徑 描述
服務 傳送端連結 /messages/devicebound 以裝置為目的地的雲端到裝置的訊息會由服務傳送到此連結。 透過此連結傳送的訊息,其 To 屬性會設定為目標裝置的接收者連結路徑 /devices/<deviceID>/messages/devicebound
服務 接收端連結 /messages/serviceBound/feedback 完成、拒絕和放棄意見反應訊息,這些訊息來自於透過服務在此連結上收到的裝置。 如需關於意見反應訊息的詳細資訊,請參閱從 IoT 中樞傳送雲端到裝置的訊息

下列程式碼片段示範如何藉由使用 Python 中的 uAMQP 程式庫,建立雲端到裝置的訊息,並將其傳送至裝置。

import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full'  # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
                    application_properties=app_props)

# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()

# Close the client if it's not needed
send_client.close()

為了接收意見反應,服務用戶端會建立接收者連結。 下列程式碼片段示範如何藉由使用 Python 中的 uAMQP 程式庫來建立連結:

import json

operation = '/messages/serviceBound/feedback'

# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
    print('received a message')
    # Check content_type in message property to identify feedback messages coming from device
    if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
        msg_body_raw = msg.get_data()
        msg_body_str = ''.join(msg_body_raw)
        msg_body = json.loads(msg_body_str)
        print(json.dumps(msg_body, indent=2))
        print('******************')
        for feedback in msg_body:
            print('feedback received')
            print('\tstatusCode: ' + str(feedback['statusCode']))
            print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
            print('\tdeviceId: ' + str(feedback['deviceId']))
            print
    else:
        print('unknown message:', msg.properties.content_type)

如上述程式碼所示,雲端到裝置的意見反應訊息具有 application/vnd.microsoft.iothub.feedback.json 的內容類型。 您可以使用訊息 JSON 主體中的屬性來推斷原始訊息的傳遞狀態:

  • 意見反應主體中的索引鍵 statusCode 具有下列其中一個值:Success、Expired、DeliveryCountExceeded、Rejected 或 Purged

  • 意見反應主體中的金鑰 deviceId 具有目標裝置的識別碼。

  • 意見反應本文中的金鑰 originalMessageId 具有透過服務傳送的原始雲端到裝置訊息識別碼。 您可以使用此傳遞狀態,將意見反應和雲端到裝置的訊息相互關聯。

接收遙測訊息 (服務用戶端)

根據預設,IoT 中樞會將內嵌的裝置遙測訊息儲存在內建事件中樞。 您的服務用戶端可以使用 AMQP 通訊協定來接收儲存的事件。

基於此目的,服務用戶端必須先連線到 IoT 中樞端點,並接收內建事件中樞的重新導向位址。 接著,服務用戶端會使用提供的位址連線到內建事件中樞。

在每個步驟中,用戶端必須呈現下列資訊片段:

  • 有效的服務認證 (服務共用存取簽章權杖)。

  • 取用者群組分割區格式正確的路徑,其想要從中擷取訊息。 對於指定的取用者群組和分割區識別碼,路徑格式如下:/messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id>(預設取用者群組為 $Default)。

  • 選擇性篩選述詞,以指定分割區中的起點。 這個述詞的格式可以是序號、位移或加入佇列的時間戳記。

下列程式碼片段使用 Python 中的 uAMQP 程式庫 來示範上述步驟:

import json
import uamqp
import urllib
import time

# Use the generate_sas_token implementation that's available here: https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
    consumer_group='$Default', p_id=0)

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'

# Helper function to set the filtering predicate on the source URI


def set_endpoint_filter(uri, endpoint_filter=''):
    source_uri = uamqp.address.Source(uri)
    source_uri.set_filter(endpoint_filter)
    return source_uri


receive_client = uamqp.ReceiveClient(
    set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
    batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
    # Once a redirect error is received, close the original client and recreate a new one to the re-directed address
    receive_client.close()

    sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
        redirect.address, policy_name, access_key)
    receive_client = uamqp.ReceiveClient(set_endpoint_filter(
        redirect.address, endpoint_filter), auth=sas_auth, debug=True)

# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
    print('*** received a message ***')
    print(''.join(msg.get_data()))
    print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
    print('\t: ' + str(msg.annotations['x-opt-offset']))
    print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))

對於指定的裝置識別碼,IoT 中樞會使用裝置識別碼的雜湊來判斷要儲存其訊息的分割區。 上述程式碼片段示範如何從這類單一分割區接收事件。 不過,請注意,一般應用程式通常需要擷取儲存在所有事件中樞分割區的事件。

裝置用戶端

連線並驗證 IoT 中樞 (服務用戶端)

若要使用 AMQP 連線到 IoT 中樞,裝置可以使用宣告式安全性 (CBS)簡單驗證及安全性階層 (SASL) 驗證。

裝置用戶端需要下列資訊:

資訊
IoT 中樞主機名稱 <iot-hub-name>.azure-devices.net
存取金鑰 與裝置相關聯的主要或次要金鑰
共用存取簽章 短期的共用存取簽章,格式如下︰SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}。 若要取得產生此簽章的程式碼,請參閱控制 IoT 中樞的存取權

下列程式碼片段使用 Python 中的 uAMQP 程式庫,透過傳送者連結連線到 IoT 中樞。

import uamqp
import urllib
import uuid

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
    device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
    hostname=hostname, device_id=device_id), access_key, None)

# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)

下列連結路徑支援做為裝置作業:

建立者: 連結類型 連結路徑 描述
裝置 接收端連結 /devices/<deviceID>/messages/devicebound 以裝置為目的地的雲端到裝置訊息會透過每個目的地裝置,在此連結上接收。
裝置 傳送端連結 /devices/<deviceID>/messages/events 從裝置傳送的裝置到雲端訊息會透過此連結傳送。
裝置 傳送端連結 /messages/serviceBound/feedback 裝置透過此連結傳送至服務的雲端到裝置訊息意見反應。

接收雲端到裝置命令 (裝置用戶端)

傳送至裝置的雲端到裝置命令會到達 /devices/<deviceID>/messages/devicebound 連結。 裝置可以批次接收這些訊息,並視需要在訊息中使用訊息資料承載、訊息屬性、註釋或應用程式屬性。

下列程式碼片段使用 Python 中的 uAMQP 程式庫),以透過裝置接收裝置到雲端的訊息。

# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
    device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
    batch = receive_client.receive_message_batch(max_batch_size=5)
    for msg in batch:
        print('*** received a message ***')
        print(''.join(msg.get_data()))

        # Property 'to' is set to: '/devices/device1/messages/devicebound',
        print('\tto:                     ' + str(msg.properties.to))

        # Property 'message_id' is set to value provided by the service
        print('\tmessage_id:             ' + str(msg.properties.message_id))

        # Other properties are present if they were provided by the service
        print('\tcreation_time:          ' + str(msg.properties.creation_time))
        print('\tcorrelation_id:         ' +
              str(msg.properties.correlation_id))
        print('\tcontent_type:           ' + str(msg.properties.content_type))
        print('\treply_to_group_id:      ' +
              str(msg.properties.reply_to_group_id))
        print('\tsubject:                ' + str(msg.properties.subject))
        print('\tuser_id:                ' + str(msg.properties.user_id))
        print('\tgroup_sequence:         ' +
              str(msg.properties.group_sequence))
        print('\tcontent_encoding:       ' +
              str(msg.properties.content_encoding))
        print('\treply_to:               ' + str(msg.properties.reply_to))
        print('\tabsolute_expiry_time:   ' +
              str(msg.properties.absolute_expiry_time))
        print('\tgroup_id:               ' + str(msg.properties.group_id))

        # Message sequence number in the built-in event hub
        print('\tx-opt-sequence-number:  ' +
              str(msg.annotations['x-opt-sequence-number']))

傳送遙測訊息 (裝置用戶端)

您也可以使用 AMQP 從裝置傳送遙測訊息。 裝置可以選擇性地提供應用程式屬性的字典或各種訊息屬性,例如訊息識別碼。

若要根據訊息本文路由訊息,您必須將 content_type 屬性設定為 application/json;charset=utf-8。 若要深入瞭解如何根據訊息屬性或訊息本文路由訊息,請參閱 IoT 中樞訊息路由查詢語法文件

下列程式碼片段使用 Python 中的 uAMQP 程式庫,從裝置傳送裝置到雲端的訊息。

# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

send_client = uamqp.SendClient(uri, debug=True)

# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None

# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }

# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)

send_client.queue_message(message)
results = send_client.send_all_messages()

for result in results:
    if result == uamqp.constants.MessageState.SendFailed:
        print result

其他注意事項

  • AMQP 連線可能會因為網路問題或驗證權杖到期 (在程式碼中產生) 而中斷。 服務用戶端必須處理這些情況,並視需要重新建立連線和連結。 如果驗證權杖過期,用戶端可以在到期前主動更新權杖,以避免連線中斷。

  • 您的用戶端必須偶爾可以正確處理連結重新導向。 若要了解這類作業,請參閱您的 AMQP 用戶端文件。

下一步

若要深入了解 AMQP 通訊協定,請參閱 AMQP v1.0 規格

若要深入了解 IoT 中樞傳訊,請參閱: