ジョブのスケジュールとブロードキャスト (Python)

Azure IoT Hub を使用して、数百万のデバイスを更新するジョブのスケジュールと追跡を行います。 ジョブを使用して、次の操作を行います。

  • 必要なプロパティを更新する
  • タグを更新する
  • ダイレクト メソッドを呼び出す

概念的には、ジョブはこれらのアクションのいずれかをラップし、デバイス ツイン クエリで定義される一連のデバイスに対して実行の進行状況を追跡します。 たとえば、バックエンド アプリでは、ジョブを使用して、10,000 台のデバイスに対して reboot メソッドを呼び出すことができます。これは、デバイス ツイン クエリで指定され、将来の時刻にスケジュールされます。 次に、このアプリケーションを使って、これらの各デバイスが reboot メソッドを受信し実行する進行状況を追跡できます。

これらの各機能について詳しくは、次の記事をご覧ください。

Note

この記事で説明されている機能は、Standard レベルの IoT Hub でのみ使用できます。 Basic および Standard または Free の IoT Hub のレベルの詳細については、「ソリューションに適した IoT Hub のレベルを選択する」を参照してください。

この記事では、2 つの Python アプリを作成する方法について説明します。

  • バックエンド アプリから呼び出すことができる lockDoor というダイレクト メソッドを実装する Python のシミュレートされたデバイス アプリ simDevice.py

  • 2 つのジョブを作成する Python コンソール アプリ scheduleJobService.py。 1 つのジョブが lockDoor ダイレクト メソッドを呼び出し、別のジョブが必要なプロパティの更新を複数のデバイスに送信します。

注意

デバイスとバックエンド アプリの両方の構築に使用できる SDK ツールに関する詳細については、「Azure IoT Hub SDK」を参照してください。

前提条件

  • アクティブな Azure アカウントアカウントがない場合、Azure 試用版にサインアップして、最大 10 件の無料 Mobile Apps を入手できます。 (アカウントがない場合は、無料アカウント を数分で作成できます)。

  • IoT Hub。 CLI または Azure portal を使って作成します。

  • 登録済みのデバイス。 Azure portal に登録してください。

  • Python バージョン 3.7 以降をお勧めします。 必ず、セットアップに必要な 32 ビットまたは 64 ビットのインストールを使用してください。 インストール中に求められた場合は、プラットフォーム固有の環境変数に Python を追加します。

シミュレート対象デバイス アプリの作成

このセクションでは、クラウドによって呼び出されたダイレクト メソッドに応答する Python コンソール アプリを作成します。このアプリはシミュレートされた lockDoor メソッドをトリガーします。

  1. コマンド プロンプトで次のコマンドを実行して azure-iot-device パッケージをインストールします。

    pip install azure-iot-device
    
  2. テキスト エディターを使用して、作業ディレクトリに新しい simDevice.py ファイルを作成します。

  3. simDevice.py ファイルの先頭に、次の import ステートメントと変数を追加します。 deviceConnectionString を上記で作成したデバイスの接続文字列に置き換えます。

    import time
    from azure.iot.device import IoTHubDeviceClient, MethodResponse
    
    CONNECTION_STRING = "{deviceConnectionString}"
    
  4. 次の関数を定義します。この関数によってクライアントがインスタンス化され、lockDoor メソッドに応答し、さらにデバイス ツイン更新を受信するように構成されます。

    def create_client():
        # Instantiate the client
        client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)
    
        # Define behavior for responding to the lockDoor direct method
        def method_request_handler(method_request):
            if method_request.name == "lockDoor":
                print("Locking Door!")
    
                resp_status = 200
                resp_payload = {"Response": "lockDoor called successfully"}
                method_response = MethodResponse.create_from_method_request(
                    method_request=method_request,
                    status=resp_status,
                    payload=resp_payload
                )
                client.send_method_response(method_response)
    
        # Define behavior for receiving a twin patch
        def twin_patch_handler(twin_patch):
            print("")
            print("Twin desired properties patch received:")
            print(twin_patch)
    
        # Set the handlers on the client
        try:
            print("Beginning to listen for 'lockDoor' direct method invocations...")
            client.on_method_request_received = method_request_handler
            print("Beginning to listen for updates to the Twin desired properties...")
            client.on_twin_desired_properties_patch_received = twin_patch_handler
        except:
            # If something goes wrong while setting the handlers, clean up the client
            client.shutdown()
            raise
    
  5. サンプルを実行する次のコードを追加します。

    def main():
        print ("Starting the IoT Hub Python jobs sample...")
        client = create_client()
    
        print ("IoTHubDeviceClient waiting for commands, press Ctrl-C to exit")
        try:
            while True:
                time.sleep(100)
        except KeyboardInterrupt:
            print("IoTHubDeviceClient sample stopped!")
        finally:
            # Graceful exit
            print("Shutting down IoT Hub Client")
            client.shutdown()
    
    
    if __name__ == '__main__':
        main()
    
  6. simDevice.py ファイルを保存して閉じます。

Note

わかりやすくするために、この記事では再試行ポリシーは実装しません。 運用環境のコードでは、「一時的な障害の処理」の記事で推奨されているように、再試行ポリシー (指数関数的バックオフなど) を実装することをお勧めします。

IoT ハブ接続文字列を取得する

この記事では、デバイス上でダイレクト メソッドを呼び出し、デバイス ツインを更新するバックエンド サービスを作成します。 デバイス上でダイレクト メソッドを呼び出すには、サービスにサービス接続アクセス許可が必要です。 また、このサービスで ID レジストリの読み取りと書き込みを行うために、レジストリ読み取りおよびレジストリ書き込みアクセス許可も必要です。 これらのアクセス許可だけを含んだ既定の共有アクセス ポリシーは存在しないため、共有アクセス ポリシーを独自に作成する必要があります。

サービス接続レジストリ読み取り、およびレジストリ書き込みのアクセス許可を付与する共有アクセス ポリシーを作成し、そのポリシーの接続文字列を取得するには、次の手順を実行します。

  1. Azure portal で IoT ハブを開きます。 IoT ハブに移動するための最も簡単な方法は、[リソース グループ] を選択し、IoT ハブがあるリソース グループを選択した後、リソースの一覧から目的の IoT ハブを選択することです。

  2. IoT ハブの左側のウィンドウで、 [共有アクセス ポリシー] を選択します。

  3. ポリシー一覧の上にある上部のメニューから、[共有アクセス ポリシーの追加] を選びます。

  4. [共有アクセス ポリシーを追加] ペインで、対象のポリシーのわかりやすい名前を入力します (例: serviceAndRegistryReadWrite)。 [アクセス許可] で、[レジストリ書き込み][サービス接続] を選び ([レジストリ書き込み] を選ぶと [レジストリ読み取り] が自動的に選択されます)、次に [追加] を選びます。

    Azure portal の IoT Hub で新しいアクセス ポリシーを追加する方法を示すスクリーンショット。

  5. [共有アクセス ポリシー] ページに戻り、ポリシーの一覧から新しいポリシーを選びます。

  6. 表示される新しいペインで、[プライマリ接続文字列] のコピー アイコンを選び、値を保存します。

    Azure portal の IoT Hub でアクセス ポリシーからプライマリ接続文字列を取得する方法を示すスクリーンショット。

IoT Hub の共有アクセス ポリシーとアクセス許可の詳細については、「アクセス制御とアクセス許可」を参照してください。

ダイレクト メソッドを呼び出し、デバイス ツインのプロパティを更新するジョブのスケジュール

このセクションでは、ダイレクト メソッドを使用してデバイスでリモート lockDoor を開始する Python コンソール アプリを作成し、さらにデバイス ツインの必要なプロパティを更新します。

  1. コマンド プロンプトで次のコマンドを実行して azure-iot-hub パッケージをインストールします。

    pip install azure-iot-hub
    
  2. テキスト エディターを使用して、作業ディレクトリに新しい scheduleJobService.py ファイルを作成します。

  3. scheduleJobService.py ファイルの先頭に、次の import ステートメントと変数を追加します。 {IoTHubConnectionString} プレースホルダーを、先ほど「IoT ハブ接続文字列を取得する」でコピーしておいた IoT ハブ接続文字列に置き換えます。 {deviceId} プレースホルダーを、登録済みデバイスのデバイス ID (名前) に置き換えます。

    import os
    import sys
    import datetime
    import time
    import threading
    import uuid
    import msrest
    
    from azure.iot.hub import IoTHubJobManager, IoTHubRegistryManager
    from azure.iot.hub.models import JobProperties, JobRequest, Twin, TwinProperties, CloudToDeviceMethod
    
    CONNECTION_STRING = "{IoTHubConnectionString}"
    DEVICE_ID = "{deviceId}"
    
    METHOD_NAME = "lockDoor"
    METHOD_PAYLOAD = "{\"lockTime\":\"10m\"}"
    UPDATE_PATCH = {"building":43,"floor":3}
    TIMEOUT = 60
    WAIT_COUNT = 5
    
    # Create IoTHubJobManager
    iothub_job_manager = IoTHubJobManager.from_connection_string(CONNECTION_STRING)
    
    
  4. ダイレクト メソッドとデバイス ツインを呼び出すジョブを実行する次のメソッドを追加します。

    def device_method_job(job_id, device_id, execution_time):
        print ( "" )
        print ( "Scheduling job: " + str(job_id) )
    
        job_request = JobRequest()
        job_request.job_id = job_id
        job_request.type = "scheduleDeviceMethod"
        job_request.start_time = datetime.datetime.utcnow().isoformat()
        job_request.cloud_to_device_method = CloudToDeviceMethod(method_name=METHOD_NAME, payload=METHOD_PAYLOAD)
        job_request.max_execution_time_in_seconds = execution_time
        job_request.query_condition = "DeviceId in ['{}']".format(device_id)
    
        new_job_response = iothub_job_manager.create_scheduled_job(job_id, job_request)
    
    def device_twin_job(job_id, device_id, execution_time):
        print ( "" )
        print ( "Scheduling job " + str(job_id) )
    
        job_request = JobRequest()
        job_request.job_id = job_id
        job_request.type = "scheduleUpdateTwin"
        job_request.start_time = datetime.datetime.utcnow().isoformat()
        job_request.update_twin = Twin(etag="*", properties=TwinProperties(desired=UPDATE_PATCH))
        job_request.max_execution_time_in_seconds = execution_time
        job_request.query_condition = "DeviceId in ['{}']".format(device_id)
    
        new_job_response = iothub_job_manager.create_scheduled_job(job_id, job_request)
    
    
  5. ジョブをスケジュールし、ジョブの状態を更新する次のコードを追加します。 main ルーチンも含めます。

    def iothub_jobs_sample_run():
        try:
            method_job_id = uuid.uuid4()
            device_method_job(method_job_id, DEVICE_ID, TIMEOUT)
    
            print ( "" )
            print ( "Direct method called with Job Id: " + str(method_job_id) )
    
            twin_job_id = uuid.uuid4()
            device_twin_job(twin_job_id, DEVICE_ID, TIMEOUT)
    
            print ( "" )
            print ( "Device twin called with Job Id: " + str(twin_job_id) )
    
            while True:
                print ( "" )
    
                method_job_status = iothub_job_manager.get_scheduled_job(method_job_id)
                print ( "...job " + str(method_job_id) + " " + method_job_status.status )
    
                twin_job_status = iothub_job_manager.get_scheduled_job(twin_job_id)
                print ( "...job " + str(twin_job_id) + " " + twin_job_status.status )
    
                print ( "Job status posted, press Ctrl-C to exit" )
                time.sleep(WAIT_COUNT)
    
        except msrest.exceptions.HttpOperationError as ex:
            print ( "" )
            print ( "Http error {}".format(ex.response.text) )
            return
        except Exception as ex:
            print ( "" )
            print ( "Unexpected error {}".format(ex) )
            return
        except KeyboardInterrupt:
            print ( "" )
            print ( "IoTHubService sample stopped" )
    
    if __name__ == '__main__':
        print ( "Starting the IoT Hub jobs Python sample..." )
        print ( "    Connection string = {0}".format(CONNECTION_STRING) )
        print ( "    Device ID         = {0}".format(DEVICE_ID) )
    
        iothub_jobs_sample_run()
    
  6. scheduleJobService.py ファイルを保存して閉じます。

アプリケーションの実行

これで、アプリケーションを実行する準備が整いました。

  1. 作業ディレクトリのコマンド プロンプトで、次のコマンドを実行して再起動のダイレクト メソッドのリッスンを開始します。

    python simDevice.py
    
  2. 作業ディレクトリのコマンド プロンプトで、次のコマンドを実行して、ドアをロックしてツインを更新するジョブをトリガーします。

    python scheduleJobService.py
    
  3. ダイレクト メソッドとデバイス ツインの更新に対するデバイスの応答がコンソールに表示されます。

    IoT Hub ジョブ サンプル 1 -- デバイスの出力

    IoT Hub ジョブ サンプル 2 -- デバイスの出力

次の手順

この記事では、ダイレクト メソッドを実行してデバイス ツインのプロパティを更新するようにジョブをスケジュールしました。

IoT Hub とデバイス管理パターンの調査を続けるには、「Raspberry Pi 3 B+ 参照イメージを使用した Device Update for Azure IoT Hub のチュートリアル」でイメージを更新してください。