Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Azure IoT Hub ondersteunt OASIS Advanced Message Queuing Protocol (AMQP) versie 1.0 om verschillende functies te leveren via apparaatgerichte en servicegerichte eindpunten. In dit document wordt het gebruik van AMQP-clients beschreven om verbinding te maken met een IoT-hub om ioT Hub-functionaliteit te gebruiken.
Serviceclient
Verbinding maken en verifiëren met een IoT-hub (serviceclient)
Als u verbinding wilt maken met een IoT-hub met AMQP, kan een client gebruikmaken van de op claims gebaseerde beveiliging (CBS) of SASL-verificatie (Simple Authentication and Security Layer).
De volgende informatie is vereist voor de serviceclient:
| Information | Waarde |
|---|---|
| Hostnaam van IoT Hub | <iot-hub-name>.azure-devices.net |
| Sleutelnaam | service |
| Toegangssleutel | Een primaire of secundaire sleutel die is gekoppeld aan de service |
| Gedeelde Toegangssignatuur | Een handtekening voor gedeelde toegang met een korte levensduur in de volgende indeling: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. Zie Toegang tot IoT Hub beheren om de code voor het genereren van deze handtekening te verkrijgen. |
In het volgende codefragment wordt de uAMQP-bibliotheek in Python gebruikt om via een afzenderkoppeling verbinding te maken met een IoT-hub.
import uamqp
import urllib
import time
# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>' # example: '/messages/devicebound'
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)
Cloud-naar-apparaat-berichten aanroepen (serviceclient)
Voor meer informatie over de uitwisseling van cloud-naar-apparaat-berichten tussen de service en de IoT-hub en tussen het apparaat en de IoT-hub, raadpleegt u Informatie over cloud-naar-apparaat-berichten van een IoT-hub. De serviceclient gebruikt twee koppelingen om berichten te verzenden en feedback te ontvangen voor eerder verzonden berichten van apparaten, zoals beschreven in de volgende tabel:
| Gemaakt door | Koppelingstype | Koppelingspad | Description |
|---|---|---|---|
| Dienst | Koppeling naar afzender | /messages/devicebound |
De koppeling waarnaar de service cloud-naar-apparaat-berichten verzendt die bestemd zijn voor apparaten. Berichten die via deze koppeling worden verzonden, hebben hun To eigenschap ingesteld op het ontvangerlinkpad van het doelapparaat, /devices/<deviceID>/messages/devicebound. |
| Dienst | Ontvangerkoppeling | /messages/serviceBound/feedback |
Voltooiings-, afwijzings- en afbrekingsfeedbackberichten die afkomstig zijn van apparaten die op deze koppeling door de service zijn ontvangen. Zie Informatie over cloud-naar-apparaat-berichten van een IoT-hub voor meer informatie over feedbackberichten. |
Het volgende codefragment laat zien hoe u een cloud-naar-apparaat-bericht maakt en verzendt naar een apparaat met behulp van de uAMQP-bibliotheek in Python.
import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full' # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
application_properties=app_props)
# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()
# Close the client if it's not needed
send_client.close()
Om feedback te ontvangen, maakt de serviceclient een ontvangerkoppeling. Het volgende codefragment laat zien hoe u een koppeling maakt met behulp van de uAMQP-bibliotheek in Python:
import json
operation = '/messages/serviceBound/feedback'
# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
print('received a message')
# Check content_type in message property to identify feedback messages coming from device
if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
msg_body_raw = msg.get_data()
msg_body_str = ''.join(msg_body_raw)
msg_body = json.loads(msg_body_str)
print(json.dumps(msg_body, indent=2))
print('******************')
for feedback in msg_body:
print('feedback received')
print('\tstatusCode: ' + str(feedback['statusCode']))
print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
print('\tdeviceId: ' + str(feedback['deviceId']))
print
else:
print('unknown message:', msg.properties.content_type)
Zoals weergegeven in de voorgaande code, heeft een feedbackbericht van cloud naar apparaat een inhoudstype application/vnd.microsoft.iothub.feedback.json. U kunt de eigenschappen in de JSON-hoofdtekst van het bericht gebruiken om de bezorgingsstatus van het oorspronkelijke bericht af te stellen:
De sleutel
statusCodein de feedbacktekst heeft een van de volgende waarden: Success, Expired, DeliveryCountExceeded, Rejected of Purged.De sleutel
deviceIdin de inhoud van de feedback heeft de ID van het doelapparaat.De sleutel
originalMessageIdin de feedbacktekst heeft de ID van het oorspronkelijke cloud-naar-apparaat-bericht dat door de service is verzonden. U kunt deze bezorgingsstatus gebruiken om feedback te correleren met cloud-naar-apparaat-berichten.
Telemetrieberichten ontvangen (serviceclient)
Standaard worden in uw IoT-hub opgenomen telemetrieberichten van apparaten opgeslagen in een ingebouwde Event Hub. Uw serviceclient kan het AMQP-protocol gebruiken om de opgeslagen gebeurtenissen te ontvangen.
Hiervoor moet de serviceclient eerst verbinding maken met het IoT Hub-eindpunt en een omleidingsadres ontvangen naar de ingebouwde Event Hubs. De serviceclient gebruikt vervolgens het opgegeven adres om verbinding te maken met de ingebouwde Event Hub.
In elke stap moet de client de volgende stukjes informatie presenteren:
Geldige servicereferenties (shared access signature-token van de service).
Een goed opgemaakt pad naar de partitie van de consumentengroep waaruit berichten moeten worden opgehaald. Voor een bepaalde consumentengroep en partitie-id heeft het pad de volgende indeling:
/messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id>(de standaardconsumergroep is$Default).Een optioneel filterpredicaat om een beginpunt in de partitie aan te wijzen. Dit predicaat kan de vorm hebben van een volgnummer, verschuiving of enqueued tijdstempel.
In het volgende codefragment wordt de uAMQP-bibliotheek in Python gebruikt om de voorgaande stappen te demonstreren:
import json
import uamqp
import urllib
import time
# Use the generate_sas_token implementation that's available here: https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
consumer_group='$Default', p_id=0)
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'
# Helper function to set the filtering predicate on the source URI
def set_endpoint_filter(uri, endpoint_filter=''):
source_uri = uamqp.address.Source(uri)
source_uri.set_filter(endpoint_filter)
return source_uri
receive_client = uamqp.ReceiveClient(
set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
# Once a redirect error is received, close the original client and recreate a new one to the re-directed address
receive_client.close()
sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
redirect.address, policy_name, access_key)
receive_client = uamqp.ReceiveClient(set_endpoint_filter(
redirect.address, endpoint_filter), auth=sas_auth, debug=True)
# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
print('\t: ' + str(msg.annotations['x-opt-offset']))
print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))
Voor een bepaalde apparaat-id gebruikt de IoT-hub een hash van de apparaat-id om te bepalen in welke partitie de berichten moeten worden opgeslagen. Het voorgaande codefragment laat zien hoe gebeurtenissen worden ontvangen van één dergelijke partitie. Een typische toepassing moet echter vaak gebeurtenissen ophalen die zijn opgeslagen in alle Event Hub-partities.
Apparaatclient
Verbinding maken en verifiëren bij een IoT-hub (apparaatclient)
Als u via AMQP verbinding wilt maken met een IoT-hub, kan een apparaat gebruikmaken van op claims gebaseerde beveiliging (CBS) of SASL-verificatie (Simple Authentication and Security Layer).
De volgende informatie is vereist voor de apparaatclient:
| Information | Waarde |
|---|---|
| Hostnaam van IoT Hub | <iot-hub-name>.azure-devices.net |
| Toegangssleutel | Een primaire of secundaire sleutel die is gekoppeld aan het apparaat |
| Gedeelde Toegangssignatuur | Een handtekening voor gedeelde toegang met een korte levensduur in de volgende indeling: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. Zie Toegang tot IoT Hub beheren met handtekeningen voor gedeelde toegang om de code op te halen voor het genereren van deze handtekening. |
In het volgende codefragment wordt de uAMQP-bibliotheek in Python gebruikt om via een afzenderkoppeling verbinding te maken met een IoT-hub.
import uamqp
import urllib
import uuid
# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
hostname=hostname, device_id=device_id), access_key, None)
# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)
De volgende koppelingspaden worden ondersteund als apparaatbewerkingen:
| Gemaakt door | Koppelingstype | Koppelingspad | Description |
|---|---|---|---|
| Apparaten | Ontvangerkoppeling | /devices/<deviceID>/messages/devicebound |
De koppeling waar elk doelapparaat cloud-naar-apparaat-berichten ontvangt die bestemd zijn voor apparaten. |
| Apparaten | Koppeling naar afzender | /devices/<deviceID>/messages/events |
Apparaat-naar-cloud-berichten die vanaf een apparaat worden verzonden, worden via deze koppeling verzonden. |
| Apparaten | Koppeling naar afzender | /messages/serviceBound/feedback |
Feedback van cloud-naar-apparaatberichten die via deze koppeling door apparaten naar de service zijn verzonden. |
Cloud-naar-apparaat-opdrachten ontvangen (apparaatclient)
Cloud-naar-apparaat-opdrachten die naar apparaten worden verzonden, komen binnen via een /devices/<deviceID>/messages/devicebound koppeling. Apparaten kunnen deze berichten in batches ontvangen en de nettolading van berichtgegevens, berichteigenschappen, aantekeningen of toepassingseigenschappen in het bericht gebruiken, indien nodig.
In het volgende codefragment wordt de uAMQP-bibliotheek in Python gebruikt om cloud-naar-apparaat-berichten van een apparaat te ontvangen.
# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
# Property 'to' is set to: '/devices/device1/messages/devicebound',
print('\tto: ' + str(msg.properties.to))
# Property 'message_id' is set to value provided by the service
print('\tmessage_id: ' + str(msg.properties.message_id))
# Other properties are present if they were provided by the service
print('\tcreation_time: ' + str(msg.properties.creation_time))
print('\tcorrelation_id: ' +
str(msg.properties.correlation_id))
print('\tcontent_type: ' + str(msg.properties.content_type))
print('\treply_to_group_id: ' +
str(msg.properties.reply_to_group_id))
print('\tsubject: ' + str(msg.properties.subject))
print('\tuser_id: ' + str(msg.properties.user_id))
print('\tgroup_sequence: ' +
str(msg.properties.group_sequence))
print('\tcontent_encoding: ' +
str(msg.properties.content_encoding))
print('\treply_to: ' + str(msg.properties.reply_to))
print('\tabsolute_expiry_time: ' +
str(msg.properties.absolute_expiry_time))
print('\tgroup_id: ' + str(msg.properties.group_id))
# Message sequence number in the built-in event hub
print('\tx-opt-sequence-number: ' +
str(msg.annotations['x-opt-sequence-number']))
Telemetrieberichten verzenden (apparaatclient)
U kunt ook telemetrieberichten vanaf een apparaat verzenden met AMQP. Het apparaat kan desgewenst een woordenlijst met toepassingseigenschappen of verschillende berichteigenschappen opgeven, zoals bericht-id.
Als u berichten wilt routeren op basis van de berichttekst, moet u de content_type eigenschap instellen op application/json;charset=utf-8. Zie de documentatie over de IoT Hub-berichtrouteringssyntaxis voor meer informatie over het routeren van berichten op basis van de eigenschappen of inhoud van de berichten.
In het volgende codefragment wordt de uAMQP-bibliotheek in Python gebruikt om apparaat-naar-cloud-berichten van een apparaat te verzenden.
# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)
send_client = uamqp.SendClient(uri, debug=True)
# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None
# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }
# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)
send_client.queue_message(message)
results = send_client.send_all_messages()
for result in results:
if result == uamqp.constants.MessageState.SendFailed:
print result
Overige aantekeningen
De AMQP-verbindingen kunnen worden onderbroken vanwege een netwerkstoring of het verlopen van het verificatietoken (gegenereerd in de code). De serviceclient moet deze omstandigheden afhandelen en de verbinding en koppelingen herstellen, indien nodig. Als een verificatietoken verloopt, kan de client het wegvallen van de verbinding voorkomen door het token proactief te vernieuwen voordat het verloopt.
Uw client moet af en toe koppelingsomleidingen correct kunnen afhandelen. Als u een dergelijke bewerking wilt begrijpen, raadpleegt u de documentatie van uw AMQP-client.
Volgende stappen
Zie de AMQP v1.0-specificatie voor meer informatie over het AMQP-protocol.
Zie voor meer informatie over berichten in de IoT Hub: