ジョブのスケジュールとブロードキャスト (Python)
Azure IoT Hub を使用して、数百万のデバイスを更新するジョブのスケジュールと追跡を行います。 ジョブを使用して、次の操作を行います。
- 必要なプロパティを更新する
- タグを更新する
- ダイレクト メソッドを呼び出す
概念的には、ジョブはこれらのアクションのいずれかをラップし、デバイス ツイン クエリで定義される一連のデバイスに対して実行の進行状況を追跡します。 たとえば、バックエンド アプリでは、ジョブを使用して、10,000 台のデバイスに対して reboot メソッドを呼び出すことができます。これは、デバイス ツイン クエリで指定され、将来の時刻にスケジュールされます。 次に、このアプリケーションを使って、これらの各デバイスが reboot メソッドを受信し実行する進行状況を追跡できます。
これらの各機能について詳しくは、次の記事をご覧ください。
デバイス ツインとプロパティ: デバイス ツインの概要および IoT Hub のデバイス ツインの理解と使用
ダイレクト メソッド: IoT Hub 開発者ガイド - ダイレクト メソッド
Note
この記事で説明されている機能は、Standard レベルの IoT Hub でのみ使用できます。 Basic および Standard または Free の IoT Hub のレベルの詳細については、「ソリューションに適した IoT Hub のレベルを選択する」を参照してください。
この記事では、2 つの Python アプリを作成する方法について説明します。
バックエンド アプリから呼び出すことができる lockDoor というダイレクト メソッドを実装する Python のシミュレートされたデバイス アプリ simDevice.py。
2 つのジョブを作成する Python コンソール アプリ scheduleJobService.py。 1 つのジョブが lockDoor ダイレクト メソッドを呼び出し、別のジョブが必要なプロパティの更新を複数のデバイスに送信します。
注意
デバイスとバックエンド アプリの両方の構築に使用できる SDK ツールに関する詳細については、「Azure IoT Hub SDK」を参照してください。
前提条件
アクティブな Azure アカウントアカウントがない場合、Azure 試用版にサインアップして、最大 10 件の無料 Mobile Apps を入手できます。 (アカウントがない場合は、無料アカウント を数分で作成できます)。
Azure サブスクリプション内の IoT ハブ。 ハブがまだない場合は、「IoT ハブの作成」の手順に従うことができます。
お使いの IoT ハブに登録されているデバイス。 IoT ハブにデバイスがない場合は、「デバイスを登録する」の手順に従います。
Python バージョン 3.7 以降をお勧めします。 必ず、セットアップに必要な 32 ビットまたは 64 ビットのインストールを使用してください。 インストール中に求められた場合は、プラットフォーム固有の環境変数に Python を追加します。
シミュレート対象デバイス アプリの作成
このセクションでは、クラウドによって呼び出されたダイレクト メソッドに応答する Python コンソール アプリを作成します。このアプリはシミュレートされた lockDoor メソッドをトリガーします。
重要
この記事では、Shared Access Signature (対称キー認証とも呼ばれます) を使用してデバイスを接続する手順について説明します。 この認証方法はテストと評価には便利ですが、X.509 証明書を使用してデバイスを認証する方が安全なアプローチです。 詳細については、「セキュリティのベスト プラクティス」>「接続のセキュリティ」をご覧ください。
コマンド プロンプトで次のコマンドを実行して azure-iot-device パッケージをインストールします。
pip install azure-iot-device
テキスト エディターを使用して、作業ディレクトリに新しい simDevice.py ファイルを作成します。
simDevice.py ファイルの先頭に、次の
import
ステートメントと変数を追加します。deviceConnectionString
を上記で作成したデバイスの接続文字列に置き換えます。import time from azure.iot.device import IoTHubDeviceClient, MethodResponse CONNECTION_STRING = "{deviceConnectionString}"
次の関数を定義します。この関数によってクライアントがインスタンス化され、lockDoor メソッドに応答し、さらにデバイス ツイン更新を受信するように構成されます。
def create_client(): # Instantiate the client client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING) # Define behavior for responding to the lockDoor direct method def method_request_handler(method_request): if method_request.name == "lockDoor": print("Locking Door!") resp_status = 200 resp_payload = {"Response": "lockDoor called successfully"} method_response = MethodResponse.create_from_method_request( method_request=method_request, status=resp_status, payload=resp_payload ) client.send_method_response(method_response) # Define behavior for receiving a twin patch def twin_patch_handler(twin_patch): print("") print("Twin desired properties patch received:") print(twin_patch) # Set the handlers on the client try: print("Beginning to listen for 'lockDoor' direct method invocations...") client.on_method_request_received = method_request_handler print("Beginning to listen for updates to the Twin desired properties...") client.on_twin_desired_properties_patch_received = twin_patch_handler except: # If something goes wrong while setting the handlers, clean up the client client.shutdown() raise
サンプルを実行する次のコードを追加します。
def main(): print ("Starting the IoT Hub Python jobs sample...") client = create_client() print ("IoTHubDeviceClient waiting for commands, press Ctrl-C to exit") try: while True: time.sleep(100) except KeyboardInterrupt: print("IoTHubDeviceClient sample stopped!") finally: # Graceful exit print("Shutting down IoT Hub Client") client.shutdown() if __name__ == '__main__': main()
simDevice.py ファイルを保存して閉じます。
Note
わかりやすくするために、この記事では再試行ポリシーは実装しません。 運用環境のコードでは、「一時的な障害の処理」の記事で推奨されているように、再試行ポリシー (指数関数的バックオフなど) を実装することをお勧めします。
IoT ハブ接続文字列を取得する
この記事では、デバイス上でダイレクト メソッドを呼び出し、デバイス ツインを更新するバックエンド サービスを作成します。 デバイス上でダイレクト メソッドを呼び出すには、サービスにサービス接続アクセス許可が必要です。 また、このサービスで ID レジストリの読み取りと書き込みを行うために、レジストリ読み取りおよびレジストリ書き込みアクセス許可も必要です。 これらのアクセス許可だけを含んだ既定の共有アクセス ポリシーは存在しないため、共有アクセス ポリシーを独自に作成する必要があります。
サービス接続、レジストリ読み取り、およびレジストリ書き込みのアクセス許可を付与する共有アクセス ポリシーを作成し、そのポリシーの接続文字列を取得するには、次の手順を実行します。
Azure portal で IoT ハブを開きます。 IoT ハブに移動するための最も簡単な方法は、[リソース グループ] を選択し、IoT ハブがあるリソース グループを選択した後、リソースの一覧から目的の IoT ハブを選択することです。
IoT ハブの左側のウィンドウで、 [共有アクセス ポリシー] を選択します。
ポリシー一覧の上にある上部のメニューから、[共有アクセス ポリシーの追加] を選びます。
[共有アクセス ポリシーを追加] ペインで、対象のポリシーのわかりやすい名前を入力します (例: serviceAndRegistryReadWrite)。 [アクセス許可] で、[レジストリ書き込み] と [サービス接続] を選び ([レジストリ書き込み] を選ぶと [レジストリ読み取り] が自動的に選択されます)、次に [追加] を選びます。
[共有アクセス ポリシー] ページに戻り、ポリシーの一覧から新しいポリシーを選びます。
表示される新しいペインで、[プライマリ接続文字列] のコピー アイコンを選び、値を保存します。
IoT Hub の共有アクセス ポリシーとアクセス許可の詳細については、「アクセス制御とアクセス許可」を参照してください。
重要
この記事では、Shared Access Signature を使用してサービスに接続する手順について説明しています。 この認証方法はテストと評価には便利ですが、サービスに対する認証方法としては、Microsoft Entra ID またはマネージド ID を使用する方が安全です。 詳細については、「セキュリティのベスト プラクティス」の「クラウドのセキュリティ」 を参照してください。>
ダイレクト メソッドを呼び出し、デバイス ツインのプロパティを更新するジョブのスケジュール
このセクションでは、ダイレクト メソッドを使用してデバイスでリモート lockDoor を開始する Python コンソール アプリを作成し、さらにデバイス ツインの必要なプロパティを更新します。
コマンド プロンプトで次のコマンドを実行して azure-iot-hub パッケージをインストールします。
pip install azure-iot-hub
テキスト エディターを使用して、作業ディレクトリに新しい scheduleJobService.py ファイルを作成します。
scheduleJobService.py ファイルの先頭に、次の
import
ステートメントと変数を追加します。{IoTHubConnectionString}
プレースホルダーを、先ほど「IoT ハブ接続文字列を取得する」でコピーしておいた IoT ハブ接続文字列に置き換えます。{deviceId}
プレースホルダーを、登録済みデバイスのデバイス ID (名前) に置き換えます。import os import sys import datetime import time import threading import uuid import msrest from azure.iot.hub import IoTHubJobManager, IoTHubRegistryManager from azure.iot.hub.models import JobProperties, JobRequest, Twin, TwinProperties, CloudToDeviceMethod CONNECTION_STRING = "{IoTHubConnectionString}" DEVICE_ID = "{deviceId}" METHOD_NAME = "lockDoor" METHOD_PAYLOAD = "{\"lockTime\":\"10m\"}" UPDATE_PATCH = {"building":43,"floor":3} TIMEOUT = 60 WAIT_COUNT = 5 # Create IoTHubJobManager iothub_job_manager = IoTHubJobManager.from_connection_string(CONNECTION_STRING)
ダイレクト メソッドとデバイス ツインを呼び出すジョブを実行する次のメソッドを追加します。
def device_method_job(job_id, device_id, execution_time): print ( "" ) print ( "Scheduling job: " + str(job_id) ) job_request = JobRequest() job_request.job_id = job_id job_request.type = "scheduleDeviceMethod" job_request.start_time = datetime.datetime.utcnow().isoformat() job_request.cloud_to_device_method = CloudToDeviceMethod(method_name=METHOD_NAME, payload=METHOD_PAYLOAD) job_request.max_execution_time_in_seconds = execution_time job_request.query_condition = "DeviceId in ['{}']".format(device_id) new_job_response = iothub_job_manager.create_scheduled_job(job_id, job_request) def device_twin_job(job_id, device_id, execution_time): print ( "" ) print ( "Scheduling job " + str(job_id) ) job_request = JobRequest() job_request.job_id = job_id job_request.type = "scheduleUpdateTwin" job_request.start_time = datetime.datetime.utcnow().isoformat() job_request.update_twin = Twin(etag="*", properties=TwinProperties(desired=UPDATE_PATCH)) job_request.max_execution_time_in_seconds = execution_time job_request.query_condition = "DeviceId in ['{}']".format(device_id) new_job_response = iothub_job_manager.create_scheduled_job(job_id, job_request)
ジョブをスケジュールし、ジョブの状態を更新する次のコードを追加します。
main
ルーチンも含めます。def iothub_jobs_sample_run(): try: method_job_id = uuid.uuid4() device_method_job(method_job_id, DEVICE_ID, TIMEOUT) print ( "" ) print ( "Direct method called with Job Id: " + str(method_job_id) ) twin_job_id = uuid.uuid4() device_twin_job(twin_job_id, DEVICE_ID, TIMEOUT) print ( "" ) print ( "Device twin called with Job Id: " + str(twin_job_id) ) while True: print ( "" ) method_job_status = iothub_job_manager.get_scheduled_job(method_job_id) print ( "...job " + str(method_job_id) + " " + method_job_status.status ) twin_job_status = iothub_job_manager.get_scheduled_job(twin_job_id) print ( "...job " + str(twin_job_id) + " " + twin_job_status.status ) print ( "Job status posted, press Ctrl-C to exit" ) time.sleep(WAIT_COUNT) except msrest.exceptions.HttpOperationError as ex: print ( "" ) print ( "Http error {}".format(ex.response.text) ) return except Exception as ex: print ( "" ) print ( "Unexpected error {}".format(ex) ) return except KeyboardInterrupt: print ( "" ) print ( "IoTHubService sample stopped" ) if __name__ == '__main__': print ( "Starting the IoT Hub jobs Python sample..." ) print ( " Connection string = {0}".format(CONNECTION_STRING) ) print ( " Device ID = {0}".format(DEVICE_ID) ) iothub_jobs_sample_run()
scheduleJobService.py ファイルを保存して閉じます。
アプリケーションの実行
これで、アプリケーションを実行する準備が整いました。
作業ディレクトリのコマンド プロンプトで、次のコマンドを実行して再起動のダイレクト メソッドのリッスンを開始します。
python simDevice.py
作業ディレクトリのコマンド プロンプトで、次のコマンドを実行して、ドアをロックしてツインを更新するジョブをトリガーします。
python scheduleJobService.py
ダイレクト メソッドとデバイス ツインの更新に対するデバイスの応答がコンソールに表示されます。
次の手順
この記事では、ダイレクト メソッドを実行してデバイス ツインのプロパティを更新するようにジョブをスケジュールしました。
IoT Hub とデバイス管理パターンの調査を続けるには、「Raspberry Pi 3 B+ 参照イメージを使用した Device Update for Azure IoT Hub のチュートリアル」でイメージを更新してください。