AMQP Protokolunu kullanarak IoT hub'ınız ile iletişim kurma

Azure IoT Hub, cihaza ve hizmete yönelik uç noktalar aracılığıyla çeşitli işlevler sunmak için OASIS Gelişmiş Message Queuing Protokolü (AMQP) sürüm 1.0'ı destekler. Bu belgede, IoT Hub işlevselliğini kullanmak üzere bir IoT hub'ına bağlanmak için AMQP istemcilerinin kullanımı açıklanmaktadır.

Hizmet istemcisi

IoT hub'ına (hizmet istemcisi) Bağlan ve kimlik doğrulaması yapma

AMQP kullanarak ioT hub'ına bağlanmak için istemci talep tabanlı güvenlik (CBS) veya Basit Kimlik Doğrulama ve Güvenlik Katmanı (SASL) kimlik doğrulamasını kullanabilir.

Hizmet istemcisi için aşağıdaki bilgiler gereklidir:

Bilgiler Değer
IoT hub ana bilgisayar adı <iot-hub-name>.azure-devices.net
Anahtar adı service
Erişim anahtarı Hizmetle ilişkili birincil veya ikincil anahtar
Paylaşılan erişim imzası Aşağıdaki biçimde kısa süreli paylaşılan erişim imzası: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. Bu imzayı oluşturma kodunu almak için bkz . IoT Hub'a erişimi denetleme.

Aşağıdaki kod parçacığı, python'daki uAMQP kitaplığını kullanarak bir gönderen bağlantısı aracılığıyla bir IoT hub'ına bağlanır.

import uamqp
import urllib
import time

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>'  # example: '/messages/devicebound'

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)

Buluttan cihaza iletileri çağırma (hizmet istemcisi)

Hizmet ile IoT hub'ı arasında ve cihaz ile IoT hub'ı arasında buluttan cihaza ileti alışverişi hakkında bilgi edinmek için bkz . IoT hub'ınızdan buluttan cihaza iletiler gönderme. Hizmet istemcisi, aşağıdaki tabloda açıklandığı gibi, cihazlardan daha önce gönderilen iletiler için ileti göndermek ve geri bildirim almak için iki bağlantı kullanır:

Oluşturan: Bağlantı türü Bağlantı yolu Açıklama
Hizmet Gönderen bağlantısı /messages/devicebound Cihazları hedefleyen buluttan cihaza iletiler hizmet tarafından bu bağlantıya gönderilir. Bu bağlantı üzerinden gönderilen iletilerin To özelliği hedef cihazın alıcı bağlantı yoluna /devices/<deviceID>/messages/deviceboundayarlanmıştır.
Hizmet Alıcı bağlantısı /messages/serviceBound/feedback Hizmet tarafından bu bağlantıda alınan cihazlardan gelen tamamlama, reddetme ve bırakma geri bildirim iletileri. Geri bildirim iletileri hakkında daha fazla bilgi için bkz . IoT hub'ından buluttan cihaza iletiler gönderme.

Aşağıdaki kod parçacığı, Python'da uAMQP kitaplığını kullanarak buluttan cihaza ileti oluşturmayı ve bir cihaza göndermeyi gösterir.

import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full'  # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
                    application_properties=app_props)

# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()

# Close the client if it's not needed
send_client.close()

Geri bildirim almak için hizmet istemcisi bir alıcı bağlantısı oluşturur. Aşağıdaki kod parçacığı, Python'da uAMQP kitaplığını kullanarak nasıl bağlantı oluşturulacağını gösterir:

import json

operation = '/messages/serviceBound/feedback'

# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
    print('received a message')
    # Check content_type in message property to identify feedback messages coming from device
    if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
        msg_body_raw = msg.get_data()
        msg_body_str = ''.join(msg_body_raw)
        msg_body = json.loads(msg_body_str)
        print(json.dumps(msg_body, indent=2))
        print('******************')
        for feedback in msg_body:
            print('feedback received')
            print('\tstatusCode: ' + str(feedback['statusCode']))
            print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
            print('\tdeviceId: ' + str(feedback['deviceId']))
            print
    else:
        print('unknown message:', msg.properties.content_type)

Yukarıdaki kodda gösterildiği gibi, buluttan cihaza geri bildirim iletisinde uygulama/vnd.microsoft.iothub.feedback.json içerik türü bulunur. özgün iletinin teslim durumunu çıkarsamak için iletinin JSON gövdesindeki özellikleri kullanabilirsiniz:

  • Geri bildirim gövdesindeki anahtar statusCode şu değerlerden birine sahiptir: Success, Expired, DeliveryCountExceeded, Rejected veya Purged.

  • Geri bildirim gövdesindeki anahtar deviceId , hedef cihazın kimliğine sahiptir.

  • Geri bildirim gövdesindeki anahtar originalMessageId , hizmet tarafından gönderilen özgün buluttan cihaza iletisinin kimliğine sahiptir. Geri bildirimleri buluttan cihaza iletilere ilişkilendirmek için bu teslim durumunu kullanabilirsiniz.

Telemetri iletilerini alma (hizmet istemcisi)

Varsayılan olarak, IoT hub'ınız alınan cihaz telemetri iletilerini yerleşik bir olay hub'ında depolar. Hizmet istemciniz, depolanan olayları almak için AMQP Protokolü'ni kullanabilir.

Bu amaçla, hizmet istemcisinin önce IoT hub uç noktasına bağlanması ve yerleşik olay hub'larına bir yeniden yönlendirme adresi alması gerekir. Ardından hizmet istemcisi yerleşik olay hub'ına bağlanmak için sağlanan adresi kullanır.

Her adımda istemcinin aşağıdaki bilgi parçalarını sunması gerekir:

  • Geçerli hizmet kimlik bilgileri (hizmet paylaşılan erişim imzası belirteci).

  • İletileri almayı amaçlayan tüketici grubu bölümünün iyi biçimlendirilmiş yolu. Belirli bir tüketici grubu ve bölüm kimliği için yol şu biçimdedir: /messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id> (varsayılan tüketici grubudur $Default).

  • Bölümde bir başlangıç noktası belirtmek için isteğe bağlı bir filtreleme koşulu. Bu koşul bir dizi numarası, uzaklık veya sıralanmış zaman damgası biçiminde olabilir.

Aşağıdaki kod parçacığı, önceki adımları göstermek için Python'daki uAMQP kitaplığını kullanır:

import json
import uamqp
import urllib
import time

# Use the generate_sas_token implementation that's available here: https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
    consumer_group='$Default', p_id=0)

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'

# Helper function to set the filtering predicate on the source URI


def set_endpoint_filter(uri, endpoint_filter=''):
    source_uri = uamqp.address.Source(uri)
    source_uri.set_filter(endpoint_filter)
    return source_uri


receive_client = uamqp.ReceiveClient(
    set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
    batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
    # Once a redirect error is received, close the original client and recreate a new one to the re-directed address
    receive_client.close()

    sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
        redirect.address, policy_name, access_key)
    receive_client = uamqp.ReceiveClient(set_endpoint_filter(
        redirect.address, endpoint_filter), auth=sas_auth, debug=True)

# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
    print('*** received a message ***')
    print(''.join(msg.get_data()))
    print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
    print('\t: ' + str(msg.annotations['x-opt-offset']))
    print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))

Belirli bir cihaz kimliği için IoT hub'ı, iletilerin depolanacağı bölümü belirlemek için cihaz kimliğinin karması kullanır. Yukarıdaki kod parçacığında olayların böyle tek bir bölümden nasıl alındığı gösterilmektedir. Ancak, tipik bir uygulamanın genellikle tüm olay hub'ı bölümlerinde depolanan olayları alması gerektiğini unutmayın.

Cihaz istemcisi

IoT hub'ına (cihaz istemcisi) Bağlan ve kimlik doğrulaması yapma

AmQP kullanarak bir IoT hub'ına bağlanmak için cihaz talep tabanlı güvenlik (CBS) veya Basit Kimlik Doğrulaması ve Güvenlik Katmanı (SASL) kimlik doğrulamasını kullanabilir.

Cihaz istemcisi için aşağıdaki bilgiler gereklidir:

Bilgiler Değer
IoT hub ana bilgisayar adı <iot-hub-name>.azure-devices.net
Erişim anahtarı Cihazla ilişkili birincil veya ikincil anahtar
Paylaşılan erişim imzası Aşağıdaki biçimde kısa süreli paylaşılan erişim imzası: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. Bu imzayı oluşturma kodunu almak için bkz . IoT Hub'a erişimi denetleme.

Aşağıdaki kod parçacığı, python'daki uAMQP kitaplığını kullanarak bir gönderen bağlantısı aracılığıyla bir IoT hub'ına bağlanır.

import uamqp
import urllib
import uuid

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
    device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
    hostname=hostname, device_id=device_id), access_key, None)

# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)

Cihaz işlemleri olarak aşağıdaki bağlantı yolları desteklenir:

Oluşturan: Bağlantı türü Bağlantı yolu Açıklama
Cihazlar Alıcı bağlantısı /devices/<deviceID>/messages/devicebound Cihazları hedefleyen buluttan cihaza iletiler her hedef cihaz tarafından bu bağlantıda alınır.
Cihazlar Gönderen bağlantısı /devices/<deviceID>/messages/events Bir cihazdan gönderilen cihazdan buluta iletiler bu bağlantı üzerinden gönderilir.
Cihazlar Gönderen bağlantısı /messages/serviceBound/feedback Cihazlara göre bu bağlantı üzerinden hizmete gönderilen buluttan cihaza ileti geri bildirimi.

Buluttan cihaza komutları alma (cihaz istemcisi)

Cihazlara gönderilen buluttan cihaza komutları bir /devices/<deviceID>/messages/devicebound bağlantıya ulaşır. Cihazlar bu iletileri toplu olarak alabilir ve gerektiğinde iletideki ileti veri yükünü, ileti özelliklerini, ek açıklamaları veya uygulama özelliklerini kullanabilir.

Aşağıdaki kod parçacığı, bir cihaz tarafından buluttan cihaza iletileri almak için Python'daki uAMQP kitaplığını kullanır.

# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
    device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
    batch = receive_client.receive_message_batch(max_batch_size=5)
    for msg in batch:
        print('*** received a message ***')
        print(''.join(msg.get_data()))

        # Property 'to' is set to: '/devices/device1/messages/devicebound',
        print('\tto:                     ' + str(msg.properties.to))

        # Property 'message_id' is set to value provided by the service
        print('\tmessage_id:             ' + str(msg.properties.message_id))

        # Other properties are present if they were provided by the service
        print('\tcreation_time:          ' + str(msg.properties.creation_time))
        print('\tcorrelation_id:         ' +
              str(msg.properties.correlation_id))
        print('\tcontent_type:           ' + str(msg.properties.content_type))
        print('\treply_to_group_id:      ' +
              str(msg.properties.reply_to_group_id))
        print('\tsubject:                ' + str(msg.properties.subject))
        print('\tuser_id:                ' + str(msg.properties.user_id))
        print('\tgroup_sequence:         ' +
              str(msg.properties.group_sequence))
        print('\tcontent_encoding:       ' +
              str(msg.properties.content_encoding))
        print('\treply_to:               ' + str(msg.properties.reply_to))
        print('\tabsolute_expiry_time:   ' +
              str(msg.properties.absolute_expiry_time))
        print('\tgroup_id:               ' + str(msg.properties.group_id))

        # Message sequence number in the built-in event hub
        print('\tx-opt-sequence-number:  ' +
              str(msg.annotations['x-opt-sequence-number']))

Telemetri iletileri gönderme (cihaz istemcisi)

Ayrıca AMQP kullanarak bir cihazdan telemetri iletileri de gönderebilirsiniz. Cihaz isteğe bağlı olarak uygulama özellikleri sözlüğü veya ileti kimliği gibi çeşitli ileti özellikleri sağlayabilir.

İletileri ileti gövdesine göre yönlendirmek için özelliğini olarak application/json;charset=utf-8ayarlamanız content_type gerekir. İletileri ileti özelliklerine veya ileti gövdesine göre yönlendirme hakkında daha fazla bilgi edinmek için lütfen IoT Hub ileti yönlendirme sorgusu söz dizimi belgelerine bakın.

Aşağıdaki kod parçacığı, cihazdan buluta iletiler göndermek için Python'daki uAMQP kitaplığını kullanır.

# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

send_client = uamqp.SendClient(uri, debug=True)

# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None

# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }

# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)

send_client.queue_message(message)
results = send_client.send_all_messages()

for result in results:
    if result == uamqp.constants.MessageState.SendFailed:
        print result

Ek notlar

  • AmQP bağlantıları, bir ağ hatası veya kimlik doğrulama belirtecinin süresinin dolması (kodda oluşturulur) nedeniyle kesintiye uğrayabilir. Hizmet istemcisinin bu koşulları işlemesi ve gerekirse bağlantıyı ve bağlantıları yeniden oluşturması gerekir. Kimlik doğrulama belirtecinin süresi dolarsa istemci, süresi dolmadan önce belirteci proaktif olarak yenileyerek bağlantının bırakılmasından kaçınabilir.

  • İstemcinizin bazen bağlantı yeniden yönlendirmelerini doğru işleyebilmesi gerekir. Böyle bir işlemi anlamak için AMQP istemci belgelerinize bakın.

Sonraki adımlar

AMQP Protokolü hakkında daha fazla bilgi edinmek için bkz . AMQP v1.0 belirtimi.

IoT Hub mesajlaşması hakkında daha fazla bilgi edinmek için bkz: