Comunique-se com seu hub IoT usando o protocolo AMQP
O Hub IoT do Azure dá suporte ao OASIS Advanced Message Queuing Protocol (AMQP) versão 1.0 para fornecer uma variedade de funcionalidades por meio de pontos de extremidade voltados para dispositivos e serviços. Este documento descreve o uso de clientes AMQP para se conectar a um hub IoT para usar a funcionalidade do Hub IoT.
Cliente de serviço
Conectar-se e autenticar-se em um hub IoT (cliente de serviço)
Para se conectar a um hub IoT usando AMQP, um cliente pode usar a autenticação CBS (segurança baseada em declarações) ou SASL (Simple Authentication and Security Layer).
As seguintes informações são necessárias para o cliente de serviço:
Informação | Value |
---|---|
Nome do host do hub IoT | <iot-hub-name>.azure-devices.net |
Nome da chave | service |
Chave de acesso | Uma chave primária ou secundária associada ao serviço |
Assinatura de acesso partilhado | Uma assinatura de acesso compartilhado de curta duração no seguinte formato: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} . Para obter o código para gerar essa assinatura, consulte Controlar o acesso ao Hub IoT. |
O trecho de código a seguir usa a biblioteca uAMQP em Python para se conectar a um hub IoT por meio de um link de remetente.
import uamqp
import urllib
import time
# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>' # example: '/messages/devicebound'
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)
Invocar mensagens da nuvem para o dispositivo (cliente de serviço)
Para saber mais sobre a troca de mensagens da nuvem para o dispositivo entre o serviço e o hub IoT e entre o dispositivo e o hub IoT, consulte Enviar mensagens da nuvem para o dispositivo do seu hub IoT. O cliente de serviço usa dois links para enviar mensagens e receber feedback sobre mensagens enviadas anteriormente de dispositivos, conforme descrito na tabela a seguir:
Criado por | Tipo de ligação | Caminho do link | Description |
---|---|---|---|
Serviço | Link do remetente | /messages/devicebound |
As mensagens de nuvem para dispositivo destinadas a dispositivos são enviadas para este link pelo serviço. As mensagens enviadas através deste link têm sua To propriedade definida para o caminho do link do recetor do dispositivo de destino, /devices/<deviceID>/messages/devicebound . |
Serviço | Link do recetor | /messages/serviceBound/feedback |
Mensagens de feedback de conclusão, rejeição e abandono que vêm de dispositivos recebidos neste link pelo serviço. Para obter mais informações sobre mensagens de feedback, consulte Enviar mensagens da nuvem para o dispositivo de um hub IoT. |
O trecho de código a seguir demonstra como criar uma mensagem de nuvem para dispositivo e enviá-la para um dispositivo usando a biblioteca uAMQP em Python.
import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full' # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
application_properties=app_props)
# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()
# Close the client if it's not needed
send_client.close()
Para receber feedback, o cliente de serviço cria um link de recetor. O trecho de código a seguir demonstra como criar um link usando a biblioteca uAMQP em Python:
import json
operation = '/messages/serviceBound/feedback'
# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
print('received a message')
# Check content_type in message property to identify feedback messages coming from device
if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
msg_body_raw = msg.get_data()
msg_body_str = ''.join(msg_body_raw)
msg_body = json.loads(msg_body_str)
print(json.dumps(msg_body, indent=2))
print('******************')
for feedback in msg_body:
print('feedback received')
print('\tstatusCode: ' + str(feedback['statusCode']))
print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
print('\tdeviceId: ' + str(feedback['deviceId']))
print
else:
print('unknown message:', msg.properties.content_type)
Como mostrado no código anterior, uma mensagem de feedback da nuvem para o dispositivo tem um tipo de conteúdo de aplicativo/vnd.microsoft.iothub.feedback.json. Você pode usar as propriedades no corpo JSON da mensagem para inferir o status de entrega da mensagem original:
A chave
statusCode
no corpo do feedback tem um dos seguintes valores: Success, Expired, DeliveryCountExceeded, Rejected ou Pured.A chave
deviceId
no corpo de feedback tem o ID do dispositivo de destino.A chave
originalMessageId
no corpo do feedback tem o ID da mensagem original da nuvem para o dispositivo que foi enviada pelo serviço. Você pode usar esse status de entrega para correlacionar comentários a mensagens da nuvem para o dispositivo.
Receber mensagens de telemetria (cliente de serviço)
Por padrão, o hub IoT armazena mensagens de telemetria de dispositivo ingeridas em um hub de eventos interno. Seu cliente de serviço pode usar o protocolo AMQP para receber os eventos armazenados.
Para isso, o cliente de serviço primeiro precisa se conectar ao ponto de extremidade do hub IoT e receber um endereço de redirecionamento para os hubs de eventos internos. Em seguida, o cliente de serviço usa o endereço fornecido para se conectar ao hub de eventos interno.
Em cada etapa, o cliente precisa apresentar as seguintes informações:
Credenciais de serviço válidas (token de assinatura de acesso compartilhado do serviço).
Um caminho bem formatado para a partição do grupo de consumidores da qual pretende recuperar mensagens. Para um determinado grupo de consumidores e ID de partição, o caminho tem o seguinte formato:
/messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id>
(o grupo de consumidores padrão é$Default
).Um predicado de filtragem opcional para designar um ponto de partida na partição. Esse predicado pode estar na forma de um número de sequência, deslocamento ou carimbo de data/hora enfileirado.
O trecho de código a seguir usa a biblioteca uAMQP em Python para demonstrar as etapas anteriores:
import json
import uamqp
import urllib
import time
# Use the generate_sas_token implementation that's available here: https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
consumer_group='$Default', p_id=0)
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'
# Helper function to set the filtering predicate on the source URI
def set_endpoint_filter(uri, endpoint_filter=''):
source_uri = uamqp.address.Source(uri)
source_uri.set_filter(endpoint_filter)
return source_uri
receive_client = uamqp.ReceiveClient(
set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
# Once a redirect error is received, close the original client and recreate a new one to the re-directed address
receive_client.close()
sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
redirect.address, policy_name, access_key)
receive_client = uamqp.ReceiveClient(set_endpoint_filter(
redirect.address, endpoint_filter), auth=sas_auth, debug=True)
# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
print('\t: ' + str(msg.annotations['x-opt-offset']))
print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))
Para um determinado ID de dispositivo, o hub IoT utiliza um hash do ID do dispositivo para determinar em que partição deve armazenar as mensagens. O trecho de código anterior demonstra como os eventos são recebidos de uma única partição. No entanto, observe que um aplicativo típico geralmente precisa recuperar eventos armazenados em todas as partições do hub de eventos.
Cliente do dispositivo
Conectar-se e autenticar-se em um hub IoT (cliente de dispositivo)
Para se conectar a um hub IoT usando AMQP, um dispositivo pode usar a segurança baseada em declarações (CBS) ou a autenticação SASL (Simple Authentication and Security Layer).
As seguintes informações são necessárias para o cliente do dispositivo:
Informação | Value |
---|---|
Nome do host do hub IoT | <iot-hub-name>.azure-devices.net |
Chave de acesso | Uma chave primária ou secundária associada ao dispositivo |
Assinatura de acesso partilhado | Uma assinatura de acesso compartilhado de curta duração no seguinte formato: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} . Para obter o código para gerar essa assinatura, consulte Controlar o acesso ao Hub IoT. |
O trecho de código a seguir usa a biblioteca uAMQP em Python para se conectar a um hub IoT por meio de um link de remetente.
import uamqp
import urllib
import uuid
# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
hostname=hostname, device_id=device_id), access_key, None)
# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)
Os seguintes caminhos de link são suportados como operações de dispositivo:
Criado por | Tipo de ligação | Caminho do link | Description |
---|---|---|---|
Dispositivos | Link do recetor | /devices/<deviceID>/messages/devicebound |
As mensagens de nuvem para dispositivo destinadas a dispositivos são recebidas neste link por cada dispositivo de destino. |
Dispositivos | Link do remetente | /devices/<deviceID>/messages/events |
As mensagens do dispositivo para a nuvem enviadas a partir de um dispositivo são enviadas através deste link. |
Dispositivos | Link do remetente | /messages/serviceBound/feedback |
Feedback de mensagem da nuvem para o dispositivo enviado ao serviço através deste link por dispositivos. |
Receber comandos da nuvem para o dispositivo (cliente do dispositivo)
Os comandos de nuvem para dispositivo que são enviados aos dispositivos chegam em um /devices/<deviceID>/messages/devicebound
link. Os dispositivos podem receber essas mensagens em lotes e usar a carga útil de dados da mensagem, as propriedades da mensagem, as anotações ou as propriedades do aplicativo na mensagem, conforme necessário.
O trecho de código a seguir usa a biblioteca uAMQP em Python) para receber mensagens da nuvem para o dispositivo por um dispositivo.
# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
# Property 'to' is set to: '/devices/device1/messages/devicebound',
print('\tto: ' + str(msg.properties.to))
# Property 'message_id' is set to value provided by the service
print('\tmessage_id: ' + str(msg.properties.message_id))
# Other properties are present if they were provided by the service
print('\tcreation_time: ' + str(msg.properties.creation_time))
print('\tcorrelation_id: ' +
str(msg.properties.correlation_id))
print('\tcontent_type: ' + str(msg.properties.content_type))
print('\treply_to_group_id: ' +
str(msg.properties.reply_to_group_id))
print('\tsubject: ' + str(msg.properties.subject))
print('\tuser_id: ' + str(msg.properties.user_id))
print('\tgroup_sequence: ' +
str(msg.properties.group_sequence))
print('\tcontent_encoding: ' +
str(msg.properties.content_encoding))
print('\treply_to: ' + str(msg.properties.reply_to))
print('\tabsolute_expiry_time: ' +
str(msg.properties.absolute_expiry_time))
print('\tgroup_id: ' + str(msg.properties.group_id))
# Message sequence number in the built-in event hub
print('\tx-opt-sequence-number: ' +
str(msg.annotations['x-opt-sequence-number']))
Enviar mensagens de telemetria (cliente do dispositivo)
Você também pode enviar mensagens de telemetria de um dispositivo usando AMQP. O dispositivo pode, opcionalmente, fornecer um dicionário de propriedades do aplicativo ou várias propriedades da mensagem, como ID da mensagem.
Para rotear mensagens com base no corpo da mensagem, você deve definir a content_type
propriedade como application/json;charset=utf-8
. Para saber mais sobre o roteamento de mensagens com base nas propriedades da mensagem ou no corpo da mensagem, consulte a documentação da sintaxe da consulta de roteamento de mensagens do Hub IoT.
O trecho de código a seguir usa a biblioteca uAMQP em Python para enviar mensagens do dispositivo para a nuvem de um dispositivo.
# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)
send_client = uamqp.SendClient(uri, debug=True)
# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None
# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }
# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)
send_client.queue_message(message)
results = send_client.send_all_messages()
for result in results:
if result == uamqp.constants.MessageState.SendFailed:
print result
Notas adicionais
As conexões AMQP podem ser interrompidas devido a uma falha de rede ou à expiração do token de autenticação (gerado no código). O cliente de serviço deve lidar com essas circunstâncias e restabelecer a conexão e os links, se necessário. Se um token de autenticação expirar, o cliente poderá evitar uma queda de conexão renovando proativamente o token antes de sua expiração.
Seu cliente deve ocasionalmente ser capaz de lidar com redirecionamentos de link corretamente. Para entender essa operação, consulte a documentação do cliente AMQP.
Próximos passos
Para saber mais sobre o protocolo AMQP, consulte a especificação AMQP v1.0.
Para saber mais sobre as mensagens do Hub IoT, consulte: