Berkomunikasi dengan Azure IoT Hub Anda dengan menggunakan Protokol AMQP

Azure IoT Hub mendukung OASIS Advanced Message Queuing Protocol (AMQP) versi 1.0 untuk memberikan berbagai fungsi melalui titik akhir yang menghadap perangkat dan layanan. Dokumen ini menjelaskan penggunaan klien AMQP untuk terhubung ke Azure IoT Hub untuk menggunakan fungsionalitas Azure IoT Hub.

Klien layanan

Menyambungkan dan mengautentikasi ke Azure IoT Hub (klien layanan)

Untuk terhubung ke Azure IoT Hub dengan menggunakan AMQP, klien dapat menggunakan autentikasi keamanan berbasis klaim (CBS) atau Autentikasi Sederhana dan Lapisan Keamanan (SASL).

Informasi berikut diperlukan untuk klien layanan:

Informasi Nilai
Nama host Azure IoT Hub <iot-hub-name>.azure-devices.net
Nama kunci service
Kunci akses Kunci primer atau sekunder yang terkait dengan layanan
Tanda tangan akses bersama Tanda tangan akses bersama yang berumur pendek dalam format berikut: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. Agar mendapatkan kode untuk menghasilkan tanda tangan ini, lihat Mengontrol akses ke Azure IoT Hub.

Cuplikan kode berikut menggunakan pustaka uAMQP dengan Python untuk terhubung ke Azure IoT Hub melalui tautan pengirim.

import uamqp
import urllib
import time

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>'  # example: '/messages/devicebound'

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)

Memanggil pesan cloud ke perangkat (klien layanan)

Untuk mempelajari tentang pertukaran pesan cloud ke perangkat antara layanan dan Azure IoT Hub serta antara perangkat dan Azure IoT Hub, lihat Mengirim pesan cloud ke perangkat dari Azure IoT Hub Anda. Klien layanan menggunakan dua tautan untuk mengirim pesan dan menerima umpan balik untuk pesan yang dikirim sebelumnya dari perangkat, seperti yang dijelaskan dalam tabel berikut:

Dibuat oleh Jenis tautan Jalur tautan Deskripsi
Layanan Tautan pengirim /messages/devicebound Pesan cloud ke perangkat yang ditakdirkan untuk perangkat dikirim ke tautan ini oleh layanan. Pesan yang dikirim melalui tautan ini memiliki Toproperti yang disetel ke jalur tautan penerima perangkat target, /devices/<deviceID>/messages/devicebound.
Layanan Tautan penerima /messages/serviceBound/feedback Pesan umpan balik penyelesaian, penolakan, dan pengabaian yang berasal dari perangkat yang diterima di tautan ini berdasarkan layanan. Untuk informasi selengkapnya tentang pesan umpan balik, lihat Mengirim pesan cloud ke perangkat dari Azure IoT Hub.

Cuplikan kode berikut menunjukkan cara membuat pesan cloud ke perangkat dan mengirimkannya ke perangkat dengan menggunakan pustaka uAMQP di Python.

import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full'  # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
                    application_properties=app_props)

# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()

# Close the client if it's not needed
send_client.close()

Untuk menerima umpan balik, klien layanan membuat tautan penerima. Cuplikan kode berikut menunjukkan cara membuat tautan dengan menggunakan pustaka uAMQP di Python:

import json

operation = '/messages/serviceBound/feedback'

# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
    print('received a message')
    # Check content_type in message property to identify feedback messages coming from device
    if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
        msg_body_raw = msg.get_data()
        msg_body_str = ''.join(msg_body_raw)
        msg_body = json.loads(msg_body_str)
        print(json.dumps(msg_body, indent=2))
        print('******************')
        for feedback in msg_body:
            print('feedback received')
            print('\tstatusCode: ' + str(feedback['statusCode']))
            print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
            print('\tdeviceId: ' + str(feedback['deviceId']))
            print
    else:
        print('unknown message:', msg.properties.content_type)

Seperti yang ditunjukkan dalam kode sebelumnya, pesan umpan balik cloud ke perangkat memiliki tipe konten aplikasi/vnd.microsoft.iothub.feedback.jsdi. Anda bisa menggunakan properti dalam isi JSON pesan untuk menyimpulkan status pengiriman pesan asli:

  • Kunci statusCode dalam isi umpan balik memiliki salah satu nilai berikut: Sukses, Kedaluwarsa, DeliveryCountExceeded, Ditolak, atau Dibersihkan.

  • Kunci deviceId dalam isi umpan balik memiliki ID perangkat target.

  • Kunci originalMessageId dalam isi umpan balik memiliki ID pesan cloud ke perangkat asli yang dikirim oleh layanan. Anda dapat menggunakan status pengiriman ini untuk menghubungkan umpan balik ke pesan cloud ke perangkat.

Menerima pesan data telemetri (klien layanan)

Secara default, Azure IoT Hub Anda menyimpan pesan telemetri perangkat yang diserap di hub peristiwa bawaan. Klien layanan Anda dapat menggunakan Protokol AMQP untuk menerima peristiwa yang disimpan.

Untuk tujuan ini, klien layanan pertama-tama perlu terhubung ke titik akhir Azure IoT Hub dan menerima alamat pengalihan ke hub peristiwa bawaan. Klien layanan kemudian menggunakan alamat yang disediakan untuk terhubung ke hub peristiwa bawaan.

Dalam setiap langkah, klien perlu menyajikan informasi berikut:

  • Info masuk layanan yang valid (token tanda tangan akses bersama layanan).

  • Jalur yang diformat dengan baik ke partisi grup konsumen yang ingin diambil pesannya. Untuk grup konsumen dan ID partisi tertentu, jalur memiliki format berikut: /messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id> (grup konsumen default adalah $Default).

  • Predikat pemfilteran opsional untuk menunjuk titik awal dalam partisi. Predikat ini dapat berupa angka urutan, offset, atau tanda waktu yang diwariskan.

Cuplikan kode berikut menggunakan pustaka uAMQP di Python untuk menunjukkan langkah-langkah sebelumnya:

import json
import uamqp
import urllib
import time

# Use the generate_sas_token implementation that's available here: https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
    consumer_group='$Default', p_id=0)

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'

# Helper function to set the filtering predicate on the source URI


def set_endpoint_filter(uri, endpoint_filter=''):
    source_uri = uamqp.address.Source(uri)
    source_uri.set_filter(endpoint_filter)
    return source_uri


receive_client = uamqp.ReceiveClient(
    set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
    batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
    # Once a redirect error is received, close the original client and recreate a new one to the re-directed address
    receive_client.close()

    sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
        redirect.address, policy_name, access_key)
    receive_client = uamqp.ReceiveClient(set_endpoint_filter(
        redirect.address, endpoint_filter), auth=sas_auth, debug=True)

# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
    print('*** received a message ***')
    print(''.join(msg.get_data()))
    print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
    print('\t: ' + str(msg.annotations['x-opt-offset']))
    print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))

Untuk ID perangkat tertentu, Azure IoT Hub menggunakan hash ID perangkat untuk menentukan partisi mana yang akan menyimpan pesannya. Cuplikan kode sebelumnya menunjukkan bagaimana peristiwa diterima dari satu partisi tersebut. Namun, perhatikan bahwa aplikasi tipikal sering kali perlu mengambil peristiwa yang disimpan di semua partisi hub peristiwa.

Klien perangkat

Menyambungkan dan mengautentikasi ke Azure IoT Hub (klien perangkat)

Untuk terhubung ke Azure IoT Hub dengan menggunakan AMQP, perangkat dapat menggunakan autentikasi keamanan berbasis klaim (CBS) atau Autentikasi Sederhana dan Lapisan Keamanan (SASL).

Informasi berikut diperlukan untuk klien perangkat:

Informasi Nilai
Nama host Azure IoT Hub <iot-hub-name>.azure-devices.net
Kunci akses Kunci primer atau sekunder yang terkait dengan perangkat
Tanda tangan akses bersama Tanda tangan akses bersama yang berumur pendek dalam format berikut: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. Agar mendapatkan kode untuk menghasilkan tanda tangan ini, lihat Mengontrol akses ke Azure IoT Hub.

Cuplikan kode berikut menggunakan pustaka uAMQP dengan Python untuk terhubung ke Azure IoT Hub melalui tautan pengirim.

import uamqp
import urllib
import uuid

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
    device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
    hostname=hostname, device_id=device_id), access_key, None)

# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)

Jalur tautan berikut ini didukung sebagai operasi perangkat:

Dibuat oleh Jenis tautan Jalur tautan Deskripsi
Perangkat Tautan penerima /devices/<deviceID>/messages/devicebound Pesan cloud ke perangkat yang ditakdirkan untuk perangkat diterima di tautan ini oleh setiap perangkat tujuan.
Perangkat Tautan pengirim /devices/<deviceID>/messages/events Pesan perangkat ke cloud yang dikirim dari perangkat dikirim melalui tautan ini.
Perangkat Tautan pengirim /messages/serviceBound/feedback Umpan balik pesan cloud ke perangkat yang dikirim ke layanan melalui tautan ini oleh perangkat.

Menerima perintah cloud ke perangkat (klien perangkat)

Perintah cloud ke perangkat yang dikirim ke perangkat tiba di tautan /devices/<deviceID>/messages/devicebound. Perangkat dapat menerima pesan ini dalam batch, dan menggunakan muatan data pesan, properti pesan, anotasi, atau properti aplikasi dalam pesan sesuai kebutuhan.

Cuplikan kode berikut menggunakan pustaka uAMQP di Python) untuk menerima pesan cloud ke perangkat oleh perangkat.

# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
    device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
    batch = receive_client.receive_message_batch(max_batch_size=5)
    for msg in batch:
        print('*** received a message ***')
        print(''.join(msg.get_data()))

        # Property 'to' is set to: '/devices/device1/messages/devicebound',
        print('\tto:                     ' + str(msg.properties.to))

        # Property 'message_id' is set to value provided by the service
        print('\tmessage_id:             ' + str(msg.properties.message_id))

        # Other properties are present if they were provided by the service
        print('\tcreation_time:          ' + str(msg.properties.creation_time))
        print('\tcorrelation_id:         ' +
              str(msg.properties.correlation_id))
        print('\tcontent_type:           ' + str(msg.properties.content_type))
        print('\treply_to_group_id:      ' +
              str(msg.properties.reply_to_group_id))
        print('\tsubject:                ' + str(msg.properties.subject))
        print('\tuser_id:                ' + str(msg.properties.user_id))
        print('\tgroup_sequence:         ' +
              str(msg.properties.group_sequence))
        print('\tcontent_encoding:       ' +
              str(msg.properties.content_encoding))
        print('\treply_to:               ' + str(msg.properties.reply_to))
        print('\tabsolute_expiry_time:   ' +
              str(msg.properties.absolute_expiry_time))
        print('\tgroup_id:               ' + str(msg.properties.group_id))

        # Message sequence number in the built-in event hub
        print('\tx-opt-sequence-number:  ' +
              str(msg.annotations['x-opt-sequence-number']))

Mengirim pesan data telemetri (klien perangkat)

Anda juga dapat mengirim pesan telemetri dari perangkat dengan menggunakan AMQP. Perangkat secara opsional dapat menyediakan kamus properti aplikasi, atau berbagai properti pesan, seperti ID pesan.

Untuk merutekan pesan berdasarkan isi pesan, Anda harus mengatur properti content_type menjadi application/json;charset=utf-8. Untuk mempelajari lebih lanjut tentang perutean pesan baik berdasarkan properti pesan atau isi pesan, silakan lihat dokumentasi sintaks kueri perutean pesan IoT Hub.

Cuplikan kode berikut menggunakan pustaka uAMQP di Python untuk mengirim pesan perangkat ke cloud dari perangkat.

# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

send_client = uamqp.SendClient(uri, debug=True)

# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None

# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }

# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)

send_client.queue_message(message)
results = send_client.send_all_messages()

for result in results:
    if result == uamqp.constants.MessageState.SendFailed:
        print result

Catatan tambahan

  • Koneksi AMQP mungkin terganggu karena kesalahan jaringan atau berakhirnya token autentikasi (dihasilkan dalam kode). Klien layanan harus menangani keadaan ini dan membangun kembali koneksi dan tautan, jika diperlukan. Jika token autentikasi kedaluarsa, klien dapat menghindari penurunan koneksi dengan memperbarui token secara proaktif sebelum kedaluwarsa.

  • Klien Anda terkadang harus dapat menangani pengalihan tautan dengan benar. Untuk memahami operasi tersebut, lihat dokumentasi klien AMQP Anda.

Langkah berikutnya

Untuk mempelajari lebih lanjut tentang Protokol AMQP, lihat spesifikasi AMQP v1.0.

Untuk mempelajari selengkapnya tentang pesan Azure IoT Hub, lihat: