عملية التواصل مع مركز IoT الخاص بك باستخدام بروتوكول AMQP

يدعم Azure IoT Hub الإصدار 1.0 من OASIS Advanced Message Queuing Protocol (AMQP) لتقديم مجموعة متنوعة من الوظائف من خلال نقاط النهاية المخصصة للجهاز والمخصصة للخدمة. يصف هذا المستند استخدام عملاء AMQP للاتصال بمركز IoT لاستخدام وظيفة IoT Hub.

عميل الخدمة

الاتصال بمركز IoT (عميل الخدمة) والمصادقة عليه

للاتصال بمركز IoT باستخدام AMQP، يمكن للعميل استخدام مصادقة الأمان المستند إلى المطالبات (CBS) أو المصادقة البسيطة وطبقة الأمان (SASL).

المعلومات التالية مطلوبة لعميل الخدمة:

‏‏‏‏‏‏‏‏‏‏‏‏‏‏‏‏‏‏‏المعلومات القيمة‬
اسم مضيف مركز IoT <iot-hub-name>.azure-devices.net
اسم المفتاح service
مفتاح الوصول مفتاح أساسي أو ثانوي مقترن بالخدمة
توقيع الوصول المشترك توقيع وصول مشترك قصير الأجل بالتنسيق التالي: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. للحصول على التعليمات البرمجية لإنشاء هذا التوقيع، راجع التحكم في الوصول إلى IoT Hub.

تستخدم القصاصة البرمجية التالية مكتبة uAMQP في Python للاتصال بمركز IoT عبر ارتباط مُرسل.

import uamqp
import urllib
import time

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>'  # example: '/messages/devicebound'

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)

استدعاء رسائل من السحابة إلى الجهاز (عميل الخدمة)

للتعرف على تبادل الرسائل من السحابة إلى الجهاز بين الخدمة ومركز IoT وبين الجهاز ومركز IoT، راجع إرسال رسائل من السحابة إلى الجهاز من مركز IoT. يستخدم عميل الخدمة ارتباطين لإرسال الرسائل وتلقي ملاحظات حول الرسائل المُرسلة مسبقاً من الأجهزة، كما هو موضح في الجدول التالي:

تم الإنشاء بواسطة نوع الارتباط مسار الارتباط ‏‏الوصف
الخدمة ارتباط المُرسل /messages/devicebound ترسل الخدمة رسائل السحابة إلى الجهاز الموجه للأجهزة إلى هذا الارتباط. لدى الرسائل المُرسلة عبر هذا الارتباط تعيين الخاصية To إلى مسار ارتباط المتلقي الخاص بالجهاز الهدف، /devices/<deviceID>/messages/devicebound.
الخدمة ارتباط المستلم /messages/serviceBound/feedback رسائل ملاحظات الإكمال والرفض والتخلي التي تأتي من الأجهزة المستلمة على هذا الارتباط حسب الخدمة. لمزيد من المعلومات حول رسائل الملاحظات، راجع إرسال رسائل من السحابة إلى الجهاز من مركز IoT.

توضح القصاصة البرمجية التالية كيفية إنشاء رسالة من سحابة إلى جهاز وإرسالها إلى جهاز باستخدام مكتبة uAMQP في Python.

import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full'  # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
                    application_properties=app_props)

# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()

# Close the client if it's not needed
send_client.close()

لتلقي الملاحظات، يُنشئ عميل الخدمة ارتباطاً مستقبلاً. توضح القصاصة البرمجية التالية كيفية إنشاء ارتباط باستخدام مكتبة uAMQP في Python:

import json

operation = '/messages/serviceBound/feedback'

# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
    print('received a message')
    # Check content_type in message property to identify feedback messages coming from device
    if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
        msg_body_raw = msg.get_data()
        msg_body_str = ''.join(msg_body_raw)
        msg_body = json.loads(msg_body_str)
        print(json.dumps(msg_body, indent=2))
        print('******************')
        for feedback in msg_body:
            print('feedback received')
            print('\tstatusCode: ' + str(feedback['statusCode']))
            print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
            print('\tdeviceId: ' + str(feedback['deviceId']))
            print
    else:
        print('unknown message:', msg.properties.content_type)

كما هو موضح في التعليمات البرمجية السابقة، تحتوي رسالة الملاحظات من سحابة إلى جهاز على نوع محتوى application/vnd.microsoft.iothub.feedback.json. يمكنك استخدام الخصائص في نص JSON للرسالة للاستدلال على حالة تسليم الرسالة الأصلية:

  • يحتوي المفتاح statusCode في نص الملاحظات على إحدى القيم التالية: Success أو Expired أو DeliveryCountExceededed أو Rejected أو Purged.

  • يحتوي المفتاح deviceId في نص الملاحظات على معرّف الجهاز الهدف.

  • يحتوي المفتاح originalMessageId في نص الملاحظات على معرّف رسالة السحابة إلى الجهاز الأصلية التي أرسلتها الخدمة. يمكنك استخدام حالة التسليم هذه لربط الملاحظات بالرسائل من السحابة إلى الجهاز.

تلقي رسائل بيانات تتبع الاستخدام (عميل الخدمة)

بشكل افتراضي، يخزن مركز IoT رسائل بيانات تتبع الاستخدام للجهاز التي تم استيعابها في مركز أحداث مُضمن. يمكن لعميل الخدمة استخدام بروتوكول AMQP لتلقي الأحداث المخزنة.

لهذا الغرض، يحتاج عميل الخدمة أولاً إلى الاتصال بنقطة نهاية مركز IoT وتلقي عنوان إعادة توجيه إلى مراكز الأحداث المُضمنة. ثم يستخدم عميل الخدمة العنوان المتوفر للاتصال بمركز الأحداث المُضمن.

في كل خطوة، يحتاج العميل إلى تقديم أجزاء المعلومات التالية:

  • بيانات اعتماد الخدمة الصالحة (الرمز المميز لتوقيع الوصول المشترك للخدمة).

  • مسار منسق جيداً إلى قسم مجموعة المستهلكين الذي تنوي استرداد الرسائل منه. بالنسبة لمجموعة مستهلكين معينة ومعرّف القسم، يحتوي المسار على التنسيق التالي: /messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id> (مجموعة المستهلكين الافتراضية هي $Default).

  • تصفية تخمين اختيارية لتعيين نقطة بداية في القسم. يمكن أن يكون هذا التقييم في شكل رقم تسلسل أو إزاحة أو طابع زمني في قائمة الانتظار.

تستخدم القصاصة البرمجية التالية مكتبة uAMQP في Python لتوضيح الخطوات السابقة:

import json
import uamqp
import urllib
import time

# Use the generate_sas_token implementation that's available here: https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
    consumer_group='$Default', p_id=0)

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'

# Helper function to set the filtering predicate on the source URI


def set_endpoint_filter(uri, endpoint_filter=''):
    source_uri = uamqp.address.Source(uri)
    source_uri.set_filter(endpoint_filter)
    return source_uri


receive_client = uamqp.ReceiveClient(
    set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
    batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
    # Once a redirect error is received, close the original client and recreate a new one to the re-directed address
    receive_client.close()

    sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
        redirect.address, policy_name, access_key)
    receive_client = uamqp.ReceiveClient(set_endpoint_filter(
        redirect.address, endpoint_filter), auth=sas_auth, debug=True)

# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
    print('*** received a message ***')
    print(''.join(msg.get_data()))
    print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
    print('\t: ' + str(msg.annotations['x-opt-offset']))
    print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))

بالنسبة لمعرّف الجهاز معين، يستخدم مركز IoT تجزئة معرّف الجهاز لتحديد القسم الذي سيتم تخزين رسائله فيه. توضح القصاصة البرمجية السابقة كيفية تلقي الأحداث من قسم واحد من هذا القبيل. ومع ذلك، لاحظ أن التطبيق النموذجي غالباً ما يحتاج إلى استرداد الأحداث المُخزنة في جميع أقسام مركز الأحداث.

عميل الجهاز

الاتصال بمركز IoT (عميل الجهاز) والمصادقة عليه

للاتصال بمركز IoT باستخدام AMQP، يمكن للجهاز استخدام مصادقة الأمان المستند إلى المطالبات (CBS) أو المصادقة البسيطة وطبقة الأمان (SASL).

المعلومات التالية مطلوبة لعميل الجهاز:

‏‏‏‏‏‏‏‏‏‏‏‏‏‏‏‏‏‏‏المعلومات القيمة‬
اسم مضيف مركز IoT <iot-hub-name>.azure-devices.net
مفتاح الوصول مفتاح أساسي أو ثانوي مقترن بالجهاز
توقيع الوصول المشترك توقيع وصول مشترك قصير الأجل بالتنسيق التالي: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. للحصول على التعليمات البرمجية لإنشاء هذا التوقيع، راجع التحكم في الوصول إلى IoT Hub.

تستخدم القصاصة البرمجية التالية مكتبة uAMQP في Python للاتصال بمركز IoT عبر ارتباط مُرسل.

import uamqp
import urllib
import uuid

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
    device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
    hostname=hostname, device_id=device_id), access_key, None)

# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)

تُدعم مسارات الارتباط التالية كعمليات الجهاز:

تم الإنشاء بواسطة نوع الارتباط مسار الارتباط ‏‏الوصف
الأجهزة ارتباط المستلم /devices/<deviceID>/messages/devicebound يتم تلقي الرسائل من السحابة إلى الجهاز الموجهة للأجهزة على هذا الارتباط من كل جهاز وجهة.
الأجهزة ارتباط المُرسل /devices/<deviceID>/messages/events تُرسل الرسائل من جهاز إلى سحابة التي تُرسل من جهاز عبر هذا الارتباط.
الأجهزة ارتباط المُرسل /messages/serviceBound/feedback ملاحظات الرسائل من السحابة إلى الجهاز المُرسلة إلى الخدمة عبر هذا الارتباط بواسطة الأجهزة.

تلقي أوامر من السحابة إلى الجهاز (عميل الجهاز)

تصل الأوامر من السحابة إلى الجهاز التي تُرسل إلى الأجهزة على ارتباط /devices/<deviceID>/messages/devicebound. يمكن للأجهزة تلقي هذه الرسائل على دفعات، واستخدام حمولة بيانات الرسالة أو خصائص الرسالة أو التعليقات التوضيحية أو خصائص التطبيق في الرسالة حسب الحاجة.

تستخدم القصاصة البرمجية التالية مكتبة uAMQP في Python) لتلقي رسائل من السحابة إلى الجهاز بواسطة جهاز.

# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
    device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
    batch = receive_client.receive_message_batch(max_batch_size=5)
    for msg in batch:
        print('*** received a message ***')
        print(''.join(msg.get_data()))

        # Property 'to' is set to: '/devices/device1/messages/devicebound',
        print('\tto:                     ' + str(msg.properties.to))

        # Property 'message_id' is set to value provided by the service
        print('\tmessage_id:             ' + str(msg.properties.message_id))

        # Other properties are present if they were provided by the service
        print('\tcreation_time:          ' + str(msg.properties.creation_time))
        print('\tcorrelation_id:         ' +
              str(msg.properties.correlation_id))
        print('\tcontent_type:           ' + str(msg.properties.content_type))
        print('\treply_to_group_id:      ' +
              str(msg.properties.reply_to_group_id))
        print('\tsubject:                ' + str(msg.properties.subject))
        print('\tuser_id:                ' + str(msg.properties.user_id))
        print('\tgroup_sequence:         ' +
              str(msg.properties.group_sequence))
        print('\tcontent_encoding:       ' +
              str(msg.properties.content_encoding))
        print('\treply_to:               ' + str(msg.properties.reply_to))
        print('\tabsolute_expiry_time:   ' +
              str(msg.properties.absolute_expiry_time))
        print('\tgroup_id:               ' + str(msg.properties.group_id))

        # Message sequence number in the built-in event hub
        print('\tx-opt-sequence-number:  ' +
              str(msg.annotations['x-opt-sequence-number']))

إرسال رسائل بيانات تتبع الاستخدام (عميل الجهاز)

يمكنك أيضاً إرسال رسائل بيانات تتبع الاستخدام من جهاز باستخدام AMQP. يمكن للجهاز بشكل اختياري توفير قاموس من خصائص التطبيق، أو خصائص رسالة مختلفة، مثل معرّف الرسالة.

لتوجيه الرسائل استناداً إلى نص الرسالة، يجب تعيين الخاصية content_type لتكون application/json;charset=utf-8. لمعرفة المزيد حول توجيه الرسائل إما استناداً إلى خصائص الرسالة أو نص الرسالة، يرجى مراجعة وثائق بناء جملة استعلام توجيه رسالة IoT Hub.

تستخدم القصاصة البرمجية التالية مكتبة uAMQP في Python لإرسال رسائل من جهاز إلى سحابة من جهاز.

# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

send_client = uamqp.SendClient(uri, debug=True)

# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None

# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }

# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)

send_client.queue_message(message)
results = send_client.send_all_messages()

for result in results:
    if result == uamqp.constants.MessageState.SendFailed:
        print result

ملاحظات إضافية

  • قد تُعطّل اتصالات AMQP بسبب خلل في الشبكة أو انتهاء صلاحية رمز المصادقة المميز (الذي تم إنشاؤه في التعليمات البرمجية). يجب على عميل الخدمة معالجة هذه الظروف وإعادة إنشاء الاتصال والارتباطات، إذا لزم الأمر. إذا انتهت صلاحية رمز المصادقة، فيمكن للعميل تجنب قطع الاتصال عن طريق تجديد الرمز المميز بشكل استباقي قبل انتهاء صلاحيته.

  • يجب أن يكون العميل الخاص بك أحياناً قادراً على معالجة عمليات إعادة توجيه الارتباط بشكل صحيح. لفهم مثل هذه العملية، راجع وثائق عميل AMQP.

الخطوات التالية

لمعرفة المزيد حول بروتوكول AMQP، راجع مواصفات AMQP v1.0.

لمعرفة المزيد حول مراسلة IoT Hub، راجع: