Comunique com o seu hub IoT utilizando o Protocolo AMQP
Hub IoT do Azure suporta a versão 1.0 do OASIS Advanced Message Queuing Protocol (AMQP) para fornecer uma variedade de funcionalidades através de pontos finais voltados para dispositivos e virados para o serviço. Este documento descreve o uso de clientes AMQP para se conectarem a um hub IoT para usar Hub IoT funcionalidade.
Cliente de serviço
Conecte-se e autença-se com um hub IoT (cliente de serviço)
Para se conectar a um hub IoT utilizando amQP, um cliente pode usar a autenticação de segurança baseada em sinistros (CBS) ou simples autenticação e camada de segurança (SASL).
São necessárias as seguintes informações para o cliente de serviço:
Informações | Valor |
---|---|
Nome anfitrião do hub IoT | <iot-hub-name>.azure-devices.net |
Nome da chave | service |
Chave de acesso | Uma chave primária ou secundária que está associada ao serviço |
Assinatura de acesso partilhado | Uma assinatura de acesso partilhado de curta duração no seguinte formato: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} . Para obter o código para gerar esta assinatura, consulte o Controlo de acesso a Hub IoT. |
O seguinte corte de código utiliza a biblioteca uAMQP em Python para ligar a um hub IoT através de uma ligação de remetente.
import uamqp
import urllib
import time
# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>' # example: '/messages/devicebound'
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)
Invocar mensagens nuvem-dispositivo (cliente de serviço)
Para saber mais sobre a troca de mensagens nuvem-para-dispositivo entre o serviço e o hub IoT e entre o dispositivo e o hub IoT, consulte Enviar mensagens nuvem-dispositivo a partir do seu hub IoT. O cliente de serviço utiliza dois links para enviar mensagens e receber feedback para mensagens previamente enviadas a partir de dispositivos, conforme descrito na tabela seguinte:
Criado por | Tipo de ligação | Caminho de ligação | Description |
---|---|---|---|
Serviço | Ligação remetente | /messages/devicebound |
As mensagens cloud-to-device que se destinam a dispositivos são enviadas para este link pelo serviço. As mensagens enviadas através deste link têm a sua To propriedade definida para o caminho de ligação do recetor do dispositivo alvo, /devices/<deviceID>/messages/devicebound . |
Serviço | Ligação do recetor | /messages/serviceBound/feedback |
Conclusão, rejeição e abandono de mensagens de feedback que vêm de dispositivos recebidos neste link por serviço. Para obter mais informações sobre mensagens de feedback, consulte Enviar mensagens cloud-to-device a partir de um hub IoT. |
O seguinte código snippet demonstra como criar uma mensagem nuvem-para-dispositivo e enviá-la para um dispositivo utilizando a biblioteca uAMQP em Python.
import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full' # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
application_properties=app_props)
# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()
# Close the client if it's not needed
send_client.close()
Para receber feedback, o cliente de serviço cria uma ligação recetora. O seguinte corte de código demonstra como criar um link utilizando a biblioteca uAMQP em Python:
import json
operation = '/messages/serviceBound/feedback'
# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
print('received a message')
# Check content_type in message property to identify feedback messages coming from device
if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
msg_body_raw = msg.get_data()
msg_body_str = ''.join(msg_body_raw)
msg_body = json.loads(msg_body_str)
print(json.dumps(msg_body, indent=2))
print('******************')
for feedback in msg_body:
print('feedback received')
print('\tstatusCode: ' + str(feedback['statusCode']))
print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
print('\tdeviceId: ' + str(feedback['deviceId']))
print
else:
print('unknown message:', msg.properties.content_type)
Como mostrado no código anterior, uma mensagem de feedback nuvem-a-dispositivo tem um tipo de conteúdo de aplicação/vnd.microsoft.iothub.feedback.json. Pode utilizar as propriedades no corpo JSON da mensagem para inferir o estado de entrega da mensagem original:
A chave
statusCode
no corpo de feedback tem um dos seguintes valores: Sucesso, Expirado, DeliveryCountExceeded, Rejected ou Purgado.A chave
deviceId
no corpo de feedback tem a identificação do dispositivo alvo.A chave
originalMessageId
no corpo de comentários tem o ID da mensagem original nuvem-para-dispositivo que foi enviada pelo serviço. Pode utilizar este estado de entrega para correlacionar o feedback com mensagens cloud-to-device.
Receber mensagens de telemetria (cliente de serviço)
Por padrão, o seu hub IoT armazena mensagens de telemetria de dispositivo ingeridas num centro de eventos incorporado. O seu cliente de serviço pode utilizar o Protocolo AMQP para receber os eventos armazenados.
Para o efeito, o cliente de serviço precisa primeiro de se ligar ao ponto final do hub IoT e receber um endereço de redirecionamento para os centros de eventos incorporados. O cliente de serviço utiliza então o endereço fornecido para ligar ao centro de eventos incorporado.
Em cada passo, o cliente necessita de apresentar as seguintes informações:
Credenciais de serviço válidas (ficha de assinatura de acesso partilhado de serviço).
Um caminho bem formatado para a divisão do grupo de consumidores de que pretende recuperar mensagens. Para um determinado grupo de consumidores e ID de partição, o caminho tem o seguinte formato:
/messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id>
(o grupo de consumidores predefinido é$Default
).Um predicado de filtragem opcional para designar um ponto de partida na partição. Este predicado pode ser na forma de um número de sequência, offset ou tempotamp enraed.
O seguinte código corta-código utiliza a biblioteca uAMQP em Python para demonstrar os passos anteriores:
import json
import uamqp
import urllib
import time
# Use the generate_sas_token implementation that's available here: https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
consumer_group='$Default', p_id=0)
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'
# Helper function to set the filtering predicate on the source URI
def set_endpoint_filter(uri, endpoint_filter=''):
source_uri = uamqp.address.Source(uri)
source_uri.set_filter(endpoint_filter)
return source_uri
receive_client = uamqp.ReceiveClient(
set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
# Once a redirect error is received, close the original client and recreate a new one to the re-directed address
receive_client.close()
sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
redirect.address, policy_name, access_key)
receive_client = uamqp.ReceiveClient(set_endpoint_filter(
redirect.address, endpoint_filter), auth=sas_auth, debug=True)
# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
print('\t: ' + str(msg.annotations['x-opt-offset']))
print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))
Para um determinado ID de dispositivo, o hub IoT utiliza um hash do ID do dispositivo para determinar em que partição deve armazenar as mensagens. O corte de código anterior demonstra como os eventos são recebidos de uma única divisão. No entanto, note que uma aplicação típica precisa muitas vezes de recuperar eventos que são armazenados em todas as divisórias do centro de eventos.
Cliente do dispositivo
Conecte-se e autença-se a um hub IoT (cliente do dispositivo)
Para se ligar a um hub IoT utilizando AMQP, um dispositivo pode utilizar a autenticação baseada em sinistros (CBS) ou simples autenticação e camada de segurança (SASL).
São necessárias as seguintes informações para o cliente do dispositivo:
Informações | Valor |
---|---|
Nome anfitrião do hub IoT | <iot-hub-name>.azure-devices.net |
Chave de acesso | Uma chave primária ou secundária que está associada ao dispositivo |
Assinatura de acesso partilhado | Uma assinatura de acesso partilhado de curta duração no seguinte formato: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} . Para obter o código para gerar esta assinatura, consulte o Controlo de acesso a Hub IoT. |
O seguinte corte de código utiliza a biblioteca uAMQP em Python para ligar a um hub IoT através de uma ligação de remetente.
import uamqp
import urllib
import uuid
# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
hostname=hostname, device_id=device_id), access_key, None)
# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)
Os seguintes caminhos de ligação são suportados como operações do dispositivo:
Criado por | Tipo de ligação | Caminho de ligação | Description |
---|---|---|---|
Dispositivos | Ligação do recetor | /devices/<deviceID>/messages/devicebound |
As mensagens cloud-to-device que se destinam a dispositivos são recebidas nesta ligação por cada dispositivo de destino. |
Dispositivos | Ligação remetente | /devices/<deviceID>/messages/events |
As mensagens dispositivo-nuvem que são enviadas a partir de um dispositivo são enviadas por este link. |
Dispositivos | Ligação remetente | /messages/serviceBound/feedback |
Feedback de mensagem nuvem-para-dispositivo enviado ao serviço através deste link por dispositivos. |
Receber comandos nuvem-a-dispositivo (cliente do dispositivo)
Os comandos nuvem-dispositivo que são enviados para os dispositivos chegam a um /devices/<deviceID>/messages/devicebound
link. Os dispositivos podem receber estas mensagens em lotes e utilizar a carga útil de dados de mensagens, propriedades de mensagens, anotações ou propriedades de aplicação na mensagem, conforme necessário.
O seguinte código snippet utiliza a biblioteca uAMQP em Python) para receber mensagens nuvem-dispositivo por um dispositivo.
# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
# Property 'to' is set to: '/devices/device1/messages/devicebound',
print('\tto: ' + str(msg.properties.to))
# Property 'message_id' is set to value provided by the service
print('\tmessage_id: ' + str(msg.properties.message_id))
# Other properties are present if they were provided by the service
print('\tcreation_time: ' + str(msg.properties.creation_time))
print('\tcorrelation_id: ' +
str(msg.properties.correlation_id))
print('\tcontent_type: ' + str(msg.properties.content_type))
print('\treply_to_group_id: ' +
str(msg.properties.reply_to_group_id))
print('\tsubject: ' + str(msg.properties.subject))
print('\tuser_id: ' + str(msg.properties.user_id))
print('\tgroup_sequence: ' +
str(msg.properties.group_sequence))
print('\tcontent_encoding: ' +
str(msg.properties.content_encoding))
print('\treply_to: ' + str(msg.properties.reply_to))
print('\tabsolute_expiry_time: ' +
str(msg.properties.absolute_expiry_time))
print('\tgroup_id: ' + str(msg.properties.group_id))
# Message sequence number in the built-in event hub
print('\tx-opt-sequence-number: ' +
str(msg.annotations['x-opt-sequence-number']))
Enviar mensagens de telemetria (cliente do dispositivo)
Também pode enviar mensagens de telemetria a partir de um dispositivo utilizando AMQP. O dispositivo pode opcionalmente fornecer um dicionário de propriedades de aplicações, ou várias propriedades de mensagens, tais como ID de mensagem.
Para encaminhar mensagens com base no corpo da mensagem, tem de definir a content_type
propriedade como estando application/json;charset=utf-8
. Para saber mais sobre mensagens de encaminhamento com base em propriedades de mensagens ou corpo de mensagens, consulte a documentação de sintaxe de encaminhamento de mensagens Hub IoT.
O seguinte código snippet utiliza a biblioteca uAMQP em Python para enviar mensagens dispositivo-a-nuvem a partir de um dispositivo.
# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)
send_client = uamqp.SendClient(uri, debug=True)
# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None
# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }
# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)
send_client.queue_message(message)
results = send_client.send_all_messages()
for result in results:
if result == uamqp.constants.MessageState.SendFailed:
print result
Notas adicionais
As ligações AMQP podem ser interrompidas devido a uma falha de rede ou à expiração do token de autenticação (gerado no código). O cliente de serviço deve lidar com estas circunstâncias e restabelecer a ligação e links, se necessário. Se um símbolo de autenticação expirar, o cliente pode evitar uma queda de ligação renovando proativamente o token antes da sua expiração.
O seu cliente deve ocasionalmente ser capaz de lidar corretamente com as reorientações de ligação. Para entender tal operação, consulte a documentação do seu cliente AMQP.
Passos seguintes
Para saber mais sobre o Protocolo AMQP, consulte a especificação AMQP v1.0.
Para saber mais sobre Hub IoT mensagens, consulte: