I have created an initial prototype to connect from an IoT device to an IoT hub using UAMPQ based on other examples on the internet. The code below shows working code for this that was verified using Azure IoT Explorer.
I attempted to then port the example application to Pika as the UAMPQ git repo reports that it is being deprecated in the first quarter of 2025 and that e.g. Azure Service Bus has already transitioned to another AMPQ python library.
When running the same example using Pika I run into issues with SSL where it reports an unexpected EOF while negotiating a connection. Exact error is underneath the code sample. Are there working examples using Pika or another equivalent library?
Code sample:
import pika
from base64 import b64encode, b64decode
from hashlib import sha256
from hmac import HMAC
from time import time
from urllib.parse import quote_plus, urlencode
import uamqp
sas_token = ''
auth_username = ''
def _generate_sas_token(uri, policy, key, expiry=None):
if not expiry:
expiry = time() + 3600 # Default to 1 hour.
encoded_uri = quote_plus(uri)
ttl = int(expiry)
sign_key = '%s\n%d' % (encoded_uri, ttl)
signature = b64encode(HMAC(b64decode(key), sign_key.encode('utf-8'), sha256).digest())
result = {
'sr': uri,
'sig': signature,
'se': str(ttl)}
#if policy:
# result['skn'] = policy
return 'SharedAccessSignature ' + urlencode(result)
def _build_iothub_amqp_endpoint_from_target(target, deviceendpoint):
global sas_token
global auth_username
hub_name = target['hostname'].split('.')[0]
endpoint = "{}@sas.{}".format(target['device'], hub_name)
auth_username = endpoint
endpoint = quote_plus(endpoint)
sas_token = _generate_sas_token(target['hostname'] + deviceendpoint,
target['key_name'],
target['access_key'],
time() + 36000)
endpoint = endpoint + ":{}@{}".format(quote_plus(sas_token), target['hostname'])
return endpoint
def transmitUampq(target : str, msg_content : bytes):
# NOTE: Using UAMPQ this would work
msg_props = uamqp.message.MessageProperties()
message = uamqp.Message(msg_content, properties=msg_props)
send_client = uamqp.SendClient(target, debug=True)
send_client.queue_message(message)
results = send_client.send_all_messages()
assert not [m for m in results if m == uamqp.constants.MessageState.SendFailed]
print("INFO: Message sent using UAMQP client.")
def transmitPika(address : str, message : bytes, timeout_s : int = 100):
try:
params = pika.URLParameters(address)
params.socket_timeout = timeout_s
params.blocked_connection_timeout = timeout_s
params.connection_attempts = 3
# FAILS ON MAKING A CONNECTION
print(f"Creating connection to {address} with timeout {timeout_s} and blocked_connection_timeout {timeout_s} and connection_attempts {params.connection_attempts}...")
connection = pika.BlockingConnection(params) # Connect to IoT Hub - fails here
print("INFO: Connected to IoT Hub")
channel = connection.channel() # start a channel
channel.confirm_delivery()
# send a message
channel.basic_publish(exchange=''
, routing_key=''
, body=message
, properties=pika.BasicProperties(delivery_mode=1)
, mandatory=True)
print ("INFO: Message sent to consumer")
connection.close()
return 0
except Exception as e:
print(f"ERROR: Unknown exception whilst trying to transmit the RMQ message: {e}")
return -2
def start_iot_hub_send(live_iothub_config):
global sas_token
global auth_username
# Work out the endpoint to send this to securely
deviceendpoint='/devices/{}'.format(live_iothub_config['device'])
operation = deviceendpoint + '/messages/events'
endpoint = _build_iothub_amqp_endpoint_from_target(live_iothub_config, deviceendpoint)
target = 'amqps://' + endpoint + operation
print("INFO: Target: {}".format(target))
# Send the message with an up to date client
message = b"Hello World!"
print("INFO: Sending message: {}".format(message))
# transmitPika(target, message) # Doesn't work
transmitUampq(target, message) # Works
if __name__ == '__main__':
# TODO Obtain this information from Azure
config = {}
config['hostname'] = '' # e.g. 'sdbiothub1.azure-devices.net'
config['device'] = '' # e.g. 'amqptest'
config['key_name'] = '' # leave empty string
config['access_key'] = '' # e.g 'P38y2x3vdWNGu7Fd9Tqq9saPgDry/kZTyaKmpy1XYhg='
start_iot_hub_send(config)
Error message received:
The protocol returned by the server is not supported: ('StreamLostError: ("Stream connection lost: SSLEOFError(8, \'[SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:2578)\')",)',)