Python Code to create IOT Hub Enrollemnt Group

Vinod R S 0 Reputation points
2023-05-26T11:00:11.09+00:00

I am trying to create a IOT Create an enrollment group using Certificates . I am not able to find any code that supports. https://learn.microsoft.com/en-us/rest/api/iot-dps/service/enrollment-group/create-or-update#x509certificates Is the closest I found, do we have one that same in Python

Azure IoT Hub
Azure IoT Hub
An Azure service that enables bidirectional communication between internet of things (IoT) devices and applications.
1,272 questions
0 comments No comments
{count} votes

1 answer

Sort by: Most helpful
  1. AshokPeddakotla-MSFT 35,971 Reputation points Moderator
    2023-05-26T13:31:06.0433333+00:00

    Vinod R S Welcome to Microsoft Q&A forum!

    Have you checked the documentation Provision multiple X.509 devices using enrollment groups already?

    Python Code to create IOT Hub Enrollemnt Group

    The Samples are available here:

    User's image

    Example reference provision_x509.py :

    import os
    import asyncio
    from azure.iot.device import X509
    from azure.iot.device.aio import ProvisioningDeviceClient
    from azure.iot.device.aio import IoTHubDeviceClient
    from azure.iot.device import Message
    import uuid
    
    
    provisioning_host = os.getenv("PROVISIONING_HOST")
    id_scope = os.getenv("PROVISIONING_IDSCOPE")
    registration_id = os.getenv("DPS_X509_REGISTRATION_ID")
    messages_to_send = 10
    
    
    async def main():
        x509 = X509(
            cert_file=os.getenv("X509_CERT_FILE"),
            key_file=os.getenv("X509_KEY_FILE"),
            pass_phrase=os.getenv("X509_PASS_PHRASE"),
        )
    
        provisioning_device_client = ProvisioningDeviceClient.create_from_x509_certificate(
            provisioning_host=provisioning_host,
            registration_id=registration_id,
            id_scope=id_scope,
            x509=x509,
        )
    
        registration_result = await provisioning_device_client.register()
    
        print("The complete registration result is")
        print(registration_result.registration_state)
    
        if registration_result.status == "assigned":
            print("Will send telemetry from the provisioned device")
            device_client = IoTHubDeviceClient.create_from_x509_certificate(
                x509=x509,
                hostname=registration_result.registration_state.assigned_hub,
                device_id=registration_result.registration_state.device_id,
            )
    
            # Connect the client.
            await device_client.connect()
    
            async def send_test_message(i):
                print("sending message #" + str(i))
                msg = Message("test wind speed " + str(i))
                msg.message_id = uuid.uuid4()
                await device_client.send_message(msg)
                print("done sending message #" + str(i))
    
            # send `messages_to_send` messages in parallel
            await asyncio.gather(*[send_test_message(i) for i in range(1, messages_to_send + 1)])
    
            # finally, disconnect
            await device_client.disconnect()
        else:
            print("Can not send telemetry from the provisioned device")
    
    
    if __name__ == "__main__":
        asyncio.run(main())
    
        # If using Python 3.6 use the following code instead of asyncio.run(main()):
        # loop = asyncio.get_event_loop()
        # loop.run_until_complete(main())
        # loop.close()
    
    

    I would suggest you, please check the Azure IoT SDK Python samples based on your specific needs using X509/ Symmetric Keys and let us know if you need further help.

    Hope this helps. Do let us know if you any further queries.

    If this answers your query, do click Accept Answer and Yes for was this answer helpful. And, if you have any further query do let us know.


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.