Komunikacja z centrum IoT przy użyciu protokołu AMQP

Usługa Azure IoT Hub obsługuje protokół 1.0 Advanced Message Queuing Protocol (AMQP) o nazwie OASIS Advanced Message Queuing Protocol w wersji 1.0 w celu dostarczania różnych funkcji za pośrednictwem punktów końcowych dostępnych dla urządzeń i usług. W tym dokumencie opisano używanie klientów amQP do nawiązywania połączenia z centrum IoT w celu korzystania z funkcji usługi IoT Hub.

Klient usługi

Połączenie i uwierzytelnianie w centrum IoT (klient usługi)

Aby nawiązać połączenie z centrum IoT przy użyciu protokołu AMQP, klient może użyć uwierzytelniania opartego na oświadczeniach (CBS) lub prostego uwierzytelniania i warstwy zabezpieczeń (SASL).

Następujące informacje są wymagane dla klienta usługi:

Informacja Wartość
Nazwa hosta centrum IoT <iot-hub-name>.azure-devices.net
Nazwa klucza service
Klucz dostępu Klucz podstawowy lub pomocniczy skojarzony z usługą
Sygnatura dostępu współdzielonego Krótkotrwały sygnatura dostępu współdzielonego w następującym formacie: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. Aby uzyskać kod generowania tego podpisu, zobacz Kontrola dostępu do usługi IoT Hub.

Poniższy fragment kodu używa biblioteki uAMQP w języku Python do nawiązywania połączenia z centrum IoT Hub za pośrednictwem linku nadawcy.

import uamqp
import urllib
import time

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>'  # example: '/messages/devicebound'

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)

Wywoływanie komunikatów chmura-urządzenie (klient usługi)

Aby dowiedzieć się więcej na temat wymiany komunikatów z chmury na urządzenie między usługą a centrum IoT i między urządzeniem a centrum IoT, zobacz Wysyłanie komunikatów z chmury do urządzenia z centrum IoT. Klient usługi używa dwóch linków do wysyłania komunikatów i odbierania opinii dotyczących wcześniej wysyłanych komunikatów z urządzeń, zgodnie z opisem w poniższej tabeli:

Utworzone przez Typ linku Ścieżka łącza opis
Usługa Link nadawcy /messages/devicebound Komunikaty z chmury do urządzenia przeznaczone dla urządzeń są wysyłane do tego linku przez usługę. Komunikaty wysyłane za pośrednictwem tego linku mają właściwość To ustawioną na ścieżkę łącza odbiorcy urządzenia docelowego, /devices/<deviceID>/messages/devicebound.
Usługa Link odbiorcy /messages/serviceBound/feedback Komunikaty zwrotne dotyczące uzupełniania, odrzucania i porzucania, które pochodzą z urządzeń odebranych na tym linku przez usługę. Aby uzyskać więcej informacji na temat komunikatów opinii, zobacz Wysyłanie komunikatów z chmury do urządzenia z centrum IoT Hub.

Poniższy fragment kodu przedstawia sposób tworzenia komunikatu z chmury do urządzenia i wysyłania go do urządzenia przy użyciu biblioteki uAMQP w języku Python.

import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full'  # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
                    application_properties=app_props)

# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()

# Close the client if it's not needed
send_client.close()

Aby otrzymywać opinie, klient usługi tworzy link odbiorcy. Poniższy fragment kodu przedstawia sposób tworzenia linku przy użyciu biblioteki uAMQP w języku Python:

import json

operation = '/messages/serviceBound/feedback'

# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
    print('received a message')
    # Check content_type in message property to identify feedback messages coming from device
    if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
        msg_body_raw = msg.get_data()
        msg_body_str = ''.join(msg_body_raw)
        msg_body = json.loads(msg_body_str)
        print(json.dumps(msg_body, indent=2))
        print('******************')
        for feedback in msg_body:
            print('feedback received')
            print('\tstatusCode: ' + str(feedback['statusCode']))
            print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
            print('\tdeviceId: ' + str(feedback['deviceId']))
            print
    else:
        print('unknown message:', msg.properties.content_type)

Jak pokazano w poprzednim kodzie, komunikat opinii z chmury do urządzenia ma typ zawartości aplikacji/vnd.microsoft.iothub.feedback.json. Właściwości w treści JSON wiadomości można użyć, aby wywnioskować stan dostarczania oryginalnej wiadomości:

  • Klucz statusCode w treści opinii ma jedną z następujących wartości: Powodzenie, Wygaśnięcie, DeliveryCountExceeded, Odrzucone lub Przeczyszczone.

  • Klucz deviceId w treści opinii ma identyfikator urządzenia docelowego.

  • Klucz originalMessageId w treści opinii ma identyfikator oryginalnego komunikatu z chmury do urządzenia, który został wysłany przez usługę. Ten stan dostarczania umożliwia skorelowanie opinii z komunikatami z chmury do urządzenia.

Odbieranie komunikatów telemetrycznych (klient usługi)

Domyślnie centrum IoT przechowuje pozyskane komunikaty telemetryczne urządzenia w wbudowanym centrum zdarzeń. Klient usługi może odbierać przechowywane zdarzenia przy użyciu protokołu AMQP.

W tym celu klient usługi musi najpierw nawiązać połączenie z punktem końcowym centrum IoT Hub i otrzymać adres przekierowania do wbudowanych centrów zdarzeń. Następnie klient usługi używa podanego adresu, aby nawiązać połączenie z wbudowanym centrum zdarzeń.

W każdym kroku klient musi przedstawić następujące informacje:

  • Prawidłowe poświadczenia usługi (token sygnatury dostępu współdzielonego usługi).

  • Dobrze sformatowana ścieżka do partycji grupy odbiorców, z którą zamierza pobierać komunikaty. W przypadku danej grupy odbiorców i identyfikatora partycji ścieżka ma następujący format: /messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id> (domyślna grupa odbiorców to $Default).

  • Opcjonalny predykat filtrowania w celu wyznaczenia punktu początkowego w partycji. Ten predykat może być w postaci numeru sekwencji, przesunięcia lub w kolejce znacznika czasu.

Poniższy fragment kodu używa biblioteki uAMQP w języku Python do zademonstrowania powyższych kroków:

import json
import uamqp
import urllib
import time

# Use the generate_sas_token implementation that's available here: https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
    consumer_group='$Default', p_id=0)

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'

# Helper function to set the filtering predicate on the source URI


def set_endpoint_filter(uri, endpoint_filter=''):
    source_uri = uamqp.address.Source(uri)
    source_uri.set_filter(endpoint_filter)
    return source_uri


receive_client = uamqp.ReceiveClient(
    set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
    batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
    # Once a redirect error is received, close the original client and recreate a new one to the re-directed address
    receive_client.close()

    sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
        redirect.address, policy_name, access_key)
    receive_client = uamqp.ReceiveClient(set_endpoint_filter(
        redirect.address, endpoint_filter), auth=sas_auth, debug=True)

# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
    print('*** received a message ***')
    print(''.join(msg.get_data()))
    print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
    print('\t: ' + str(msg.annotations['x-opt-offset']))
    print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))

W przypadku danego identyfikatora urządzenia centrum IoT używa skrótu identyfikatora urządzenia, aby określić, w której partycji mają być przechowywane komunikaty. Powyższy fragment kodu pokazuje, jak zdarzenia są odbierane z jednej takiej partycji. Należy jednak pamiętać, że typowa aplikacja często musi pobierać zdarzenia przechowywane we wszystkich partycjach centrum zdarzeń.

Klient urządzenia

Połączenie i uwierzytelnianie w centrum IoT (klient urządzenia)

Aby nawiązać połączenie z centrum IoT przy użyciu protokołu AMQP, urządzenie może używać oświadczeń opartych na zabezpieczeniach (CBS) lub uwierzytelniania Simple Authentication and Security Layer (SASL).

Następujące informacje są wymagane dla klienta urządzenia:

Informacja Wartość
Nazwa hosta centrum IoT <iot-hub-name>.azure-devices.net
Klucz dostępu Klucz podstawowy lub pomocniczy skojarzony z urządzeniem
Sygnatura dostępu współdzielonego Krótkotrwały sygnatura dostępu współdzielonego w następującym formacie: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. Aby uzyskać kod generowania tego podpisu, zobacz Kontrola dostępu do usługi IoT Hub.

Poniższy fragment kodu używa biblioteki uAMQP w języku Python do nawiązywania połączenia z centrum IoT Hub za pośrednictwem linku nadawcy.

import uamqp
import urllib
import uuid

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
    device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
    hostname=hostname, device_id=device_id), access_key, None)

# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)

Następujące ścieżki linków są obsługiwane jako operacje urządzeń:

Utworzone przez Typ linku Ścieżka łącza opis
Urządzenia Link odbiorcy /devices/<deviceID>/messages/devicebound Komunikaty z chmury do urządzenia przeznaczone dla urządzeń są odbierane na tym linku przez każde urządzenie docelowe.
Urządzenia Link nadawcy /devices/<deviceID>/messages/events Komunikaty przesyłane z urządzenia do chmury wysyłane z urządzenia są wysyłane za pośrednictwem tego linku.
Urządzenia Link nadawcy /messages/serviceBound/feedback Opinie dotyczące komunikatów z chmury do urządzenia wysyłane do usługi za pośrednictwem tego linku przez urządzenia.

Odbieranie poleceń z chmury do urządzenia (klient urządzenia)

Polecenia z chmury do urządzenia wysyłane do urządzeń są dostarczane za pomocą linku /devices/<deviceID>/messages/devicebound . Urządzenia mogą odbierać te komunikaty w partiach i używać ładunku danych komunikatu, właściwości komunikatów, adnotacji lub właściwości aplikacji w wiadomości zgodnie z potrzebami.

Poniższy fragment kodu używa biblioteki uAMQP w języku Python do odbierania komunikatów z chmury do urządzenia.

# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
    device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
    batch = receive_client.receive_message_batch(max_batch_size=5)
    for msg in batch:
        print('*** received a message ***')
        print(''.join(msg.get_data()))

        # Property 'to' is set to: '/devices/device1/messages/devicebound',
        print('\tto:                     ' + str(msg.properties.to))

        # Property 'message_id' is set to value provided by the service
        print('\tmessage_id:             ' + str(msg.properties.message_id))

        # Other properties are present if they were provided by the service
        print('\tcreation_time:          ' + str(msg.properties.creation_time))
        print('\tcorrelation_id:         ' +
              str(msg.properties.correlation_id))
        print('\tcontent_type:           ' + str(msg.properties.content_type))
        print('\treply_to_group_id:      ' +
              str(msg.properties.reply_to_group_id))
        print('\tsubject:                ' + str(msg.properties.subject))
        print('\tuser_id:                ' + str(msg.properties.user_id))
        print('\tgroup_sequence:         ' +
              str(msg.properties.group_sequence))
        print('\tcontent_encoding:       ' +
              str(msg.properties.content_encoding))
        print('\treply_to:               ' + str(msg.properties.reply_to))
        print('\tabsolute_expiry_time:   ' +
              str(msg.properties.absolute_expiry_time))
        print('\tgroup_id:               ' + str(msg.properties.group_id))

        # Message sequence number in the built-in event hub
        print('\tx-opt-sequence-number:  ' +
              str(msg.annotations['x-opt-sequence-number']))

Wysyłanie komunikatów telemetrycznych (klient urządzenia)

Możesz również wysyłać komunikaty telemetryczne z urządzenia przy użyciu protokołu AMQP. Urządzenie może opcjonalnie podać słownik właściwości aplikacji lub różne właściwości komunikatu, takie jak identyfikator komunikatu.

Aby kierować komunikaty na podstawie treści komunikatu content_type , należy ustawić właściwość na application/json;charset=utf-8. Aby dowiedzieć się więcej na temat routingu komunikatów na podstawie właściwości komunikatów lub treści komunikatów, zobacz dokumentację składni zapytania routingu komunikatów usługi IoT Hub.

Poniższy fragment kodu używa biblioteki uAMQP w języku Python do wysyłania komunikatów z urządzenia do chmury z urządzenia.

# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

send_client = uamqp.SendClient(uri, debug=True)

# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None

# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }

# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)

send_client.queue_message(message)
results = send_client.send_all_messages()

for result in results:
    if result == uamqp.constants.MessageState.SendFailed:
        print result

Dodatkowe uwagi

  • Połączenia AMQP mogą być zakłócane z powodu usterki sieci lub wygaśnięcia tokenu uwierzytelniania (wygenerowanego w kodzie). Klient usługi musi obsługiwać te okoliczności i w razie potrzeby ponownie opublikować połączenie i łącza. Jeśli token uwierzytelniania wygaśnie, klient może uniknąć porzucania połączenia, proaktywnie odnawiając token przed jego wygaśnięciem.

  • Klient musi od czasu do czasu mieć możliwość poprawnego obsługi przekierowań linków. Aby zrozumieć taką operację, zapoznaj się z dokumentacją klienta protokołu AMQP.

Następne kroki

Aby dowiedzieć się więcej o protokole AMQP, zobacz specyfikację protokołu AMQP w wersji 1.0.

Aby dowiedzieć się więcej o komunikatach usługi IoT Hub, zobacz: