次の方法で共有


パイプラインで並列ジョブを使用する方法 (V2)

適用対象:Azure CLI ml extension v2 (現行)Python SDK azure-ai-ml v2 (現行)

ユーザーは、並列ジョブを使うことで、繰り返されるタスクを強力なマルチノード コンピューティング クラスターに分散させて、ジョブの実行を高速化できます。 たとえば、画像の大きなセットで物体検出モデルを実行しているシナリオを考えます。 Azure Machine Learning 並列ジョブを使うと、画像を簡単に分散させて、特定のコンピューティング クラスターでカスタム コードを並列に実行できます。 並列化により、時間コストが大幅に削減される可能性があります。 また、Azure Machine Learning 並列ジョブを使うと、プロセスを簡素化して自動化し、いっそう効率的にできます。

前提条件

Azure Machine Learning 並列ジョブは、パイプライン ジョブのステップの 1 つとしてのみ使用できます。 そのため、パイプラインの使用について理解しておくことが重要です。 Azure Machine Learning パイプラインの詳細については、以下の記事を参照してください。

並列ジョブが必要な理由

現実の ML エンジニアは、常に、トレーニングまたは推論タスクに関するスケールの要件を抱えています。 たとえば、データ科学者が売上予測モデルをトレーニングするための 1 つのスクリプトを提供したら、ML エンジニアはこのトレーニング タスクを個々の各ストアに適用する必要があります。 このスケールアウト プロセスでは、次のような課題があります。

  • 長い実行時間によって発生する遅延圧力。
  • タスクの続行を維持するために、予期しない問題を処理するための手動による介入。

Azure Machine Learning 並列ジョブの核心的価値は、1 つのシリアル タスクをミニバッチに分割し、それらのミニバッチを複数のコンピューティングにディスパッチして並列に実行することです。 並列ジョブを使うと、次のことができます。

  • エンド ツー エンドの実行時間を大幅に短縮します。
  • Azure Machine Learning 並列ジョブの自動エラー処理設定を使います。

次の場合は、Azure Machine Learning 並列ジョブの使用を検討する必要があります。

  • パーティション分割されたデータを使って多数のモデルをトレーニングする予定である。
  • 大規模なバッチ推論タスクを高速化したい。

並列ジョブの準備

他の種類のジョブとは異なり、並列ジョブには準備が必要です。 並列ジョブの作成を準備するには、以下のセクションのようにします。

分散させる入力とデータ分割の設定を宣言する

並列ジョブでは、1 つの主要な入力データのみを分割して並列に処理する必要があります。 主要な入力データには、表形式データまたはファイルのセットを使用できます。 入力の種類が異なると、データ分割の方法が異なる場合があります。

次の表では、入力データとデータ分割の方法の関係を示します。

データ形式 Azure Machine Learning の入力の種類 Azure Machine Learning の入力モード データ分割の方法
ファイル一覧 mltable または
uri_folder
ro_mount または
download
サイズ別 (ファイル数)
パーティション別
表形式データ" のような式に名前をバインドできます。 mltable 直接 サイズ別 (推定物理サイズ)
パーティション別

主要な入力データは、並列ジョブの YAML または Python SDK の input_data 属性を使って宣言できます。 また、${{inputs.<input name>}} を使って、並列ジョブの定義された inputs の 1 つとそれをバインドできます。 次に、さまざまな属性を入力して、主要な入力のデータ分割方法を定義する必要があります。

データ分割の方法 属性名 属性の型 ジョブの例
サイズ別 mini_batch_size string アヤメのバッチ予測
パーティション別 partition_keys 文字列の一覧 オレンジ ジュースの売上予測

適用対象: Azure CLI ML 拡張機能 v2 (現行)

batch_prediction:
  type: parallel
  compute: azureml:cpu-cluster
  inputs:
    input_data: 
      type: mltable
      path: ./neural-iris-mltable
      mode: direct
    score_model: 
      type: uri_folder
      path: ./iris-model
      mode: download
  outputs:
    job_output_file:
      type: uri_file
      mode: rw_mount

  input_data: ${{inputs.input_data}}
  mini_batch_size: "10kb"
  resources:
      instance_count: 2
  max_concurrency_per_instance: 2

  logging_level: "DEBUG"
  mini_batch_error_threshold: 5
  retry_settings:
    max_retries: 2
    timeout: 60

データ分割設定を定義したら、次の 2 つの属性を入力して、並列化のリソースの数を構成できます。

属性名 タイプ 説明 既定値
instance_count 整数 (integer) ジョブに使用するノードの数。 1
max_concurrency_per_instance 整数 (integer) 各ノードのプロセッサの数。 GPU コンピューティングの場合、既定値は 1 です。
CPU コンピューティングの場合、既定値はコアの数です。

これら 2 つの属性は、指定したコンピューティング クラスターと連携します。

並列ジョブでの分散データの動作を示す図。

2 つの属性を設定するサンプル コード:

適用対象: Azure CLI ML 拡張機能 v2 (現行)

batch_prediction:
  type: parallel
  compute: azureml:cpu-cluster
  inputs:
    input_data: 
      type: mltable
      path: ./neural-iris-mltable
      mode: direct
    score_model: 
      type: uri_folder
      path: ./iris-model
      mode: download
  outputs:
    job_output_file:
      type: uri_file
      mode: rw_mount

  input_data: ${{inputs.input_data}}
  mini_batch_size: "10kb"
  resources:
      instance_count: 2
  max_concurrency_per_instance: 2

  logging_level: "DEBUG"
  mini_batch_error_threshold: 5
  retry_settings:
    max_retries: 2
    timeout: 60

注意

主要な入力データとして表形式の mltable を使う場合は、transformations - read_delimited セクションが指定された MLTABLE 仕様ファイルを特定のパスの下に置く必要があります。 その他の例については、MLTABLE データ資産の作成に関する記事をご覧ください。

エントリ スクリプトで定義済みの関数を実装する

エントリ スクリプトは 1 つの Python ファイルであり、ユーザーはカスタム コードを使って 3 つの定義済み関数を実装する必要があります。 Azure Machine Learning 並列ジョブは、次の図のように各プロセッサでそれらを実行します。

並列ジョブでのエントリ スクリプトの動作を示す図。

関数名 必須 Description 入力 戻り値
Init() Y この関数は、ミニバッチの実行を始める前の一般的な準備に使います。 たとえば、これを使って、モデルをグローバル オブジェクトに読み込みます。 -- --
Run(mini_batch) Y mini_batches 用のメイン実行ロジックを実装します。 mini_batch:
入力データが表形式データの場合は、Pandas データフレーム。
入力データがディレクトリの場合は、ファイル パスのリスト。
データフレーム、リスト、またはタプル。
Shutdown() N コンピューティングをプールに戻す前にカスタム クリーンアップを実行する省略可能な関数。 -- --

詳しくは、次のエントリ スクリプトの例を確認してください。

エントリ スクリプトの準備ができたら、次の 2 つの属性を設定して、並列ジョブで使用できます。

属性名 タイプ 説明 既定値
code string アップロードしてジョブに使用するソース コード ディレクトリへのローカル パス。
entry_script string 定義済みの並列関数の実装を含む Python ファイル。

2 つの属性を設定するサンプル コード:

適用対象: Azure CLI ML 拡張機能 v2 (現行)

batch_prediction:
  type: parallel
  compute: azureml:cpu-cluster
  inputs:
    input_data: 
      type: mltable
      path: ./neural-iris-mltable
      mode: direct
    score_model: 
      type: uri_folder
      path: ./iris-model
      mode: download
  outputs:
    job_output_file:
      type: uri_file
      mode: rw_mount

  input_data: ${{inputs.input_data}}
  mini_batch_size: "10kb"
  resources:
      instance_count: 2
  max_concurrency_per_instance: 2

  logging_level: "DEBUG"
  mini_batch_error_threshold: 5
  retry_settings:
    max_retries: 2
    timeout: 60

  task:
    type: run_function
    code: "./script"
    entry_script: iris_prediction.py
    environment:
      name: "prs-env"
      version: 1
      image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
      conda_file: ./environment/environment_parallel.yml
    program_arguments: >-
      --model ${{inputs.score_model}}
      --error_threshold 5
      --allowed_failed_percent 30
      --task_overhead_timeout 1200
      --progress_update_timeout 600
      --first_task_creation_timeout 600
      --copy_logs_to_parent True
      --resource_monitor_interva 20
    append_row_to: ${{outputs.job_output_file}}

重要

Run(mini_batch) 関数は、データフレーム、リスト、またはタプル項目のいずれかを返す必要があります。 並列ジョブでは、その戻り値の数を使って、そのミニバッチで成功した項目の数を測定します。 すべての項目がこのミニバッチで適切に処理されている場合、理想的にはミニバッチ数が戻されたリストの数と等しくなるばずです。

重要

Init() または Run(mini_batch) 関数の引数を解析する場合は、例外を回避するために "parse_args" の代わりに "parse_known_args" を使います。 引数パーサーを含むエントリ スクリプトについては、、iris_score の例をご覧ください。

重要

主要な入力データとして mltable を使用する場合は、環境に 'mltable' ライブラリをインストールする必要があります。 この conda ファイルの例の 9 行目を参照してください。

自動化の設定を検討する

Azure Machine Learning 並列ジョブでは、手動による介入なしにジョブを自動的に制御するための多数の設定が公開されています。 詳しくは、次の表をご覧ください。

キー Type 説明 使用できる値 既定値 属性で設定する プログラム引数で設定する
ミニバッチ エラーしきい値 整数 (integer) この並列ジョブで無視できる失敗したミニバッチの数を定義します。 失敗したミニバッチの数がこのしきい値を超えた場合、並列ジョブは失敗としてマークされます。

ミニバッチは、次の場合に失敗としてマークされます。
- run() からの戻り値の数がミニバッチの入力数未満です。
- カスタム run() コードで例外をキャッチしました。

既定値は "-1" で、並列ジョブの間に失敗したすべてのミニバッチを無視することを意味します。
[-1, int.max] -1 mini_batch_error_threshold 該当なし
ミニバッチの最大再試行回数 整数 (integer) ミニバッチが失敗またはタイムアウトしたときの再試行回数を定義します。 すべての再試行が失敗した場合、そのミニバッチは失敗とマークされて mini_batch_error_threshold の計算でカウントされます。 [0, int.max] 2 retry_settings.max_retries 該当なし
ミニバッチ タイムアウト 整数 (integer) カスタム run() 関数の実行でのタイムアウトを秒単位で定義します。 実行時間がこのしきい値を超えた場合、そのミニバッチは中止され、失敗とマークされて再試行がトリガーされます。 (0, 259200] 60 retry_settings.timeout 該当なし
項目エラーしきい値 整数 (integer) 失敗した項目数のしきい値。 失敗した項目の数は、入力数と各ミニバッチから返された数の間のギャップによってカウントされます。 失敗した項目の合計がこのしきい値を超えた場合、並列ジョブは失敗としてマークされます。

注: 既定値は "-1" で、並列ジョブの間の失敗をすべて無視することを意味します。
[-1, int.max] -1 N/A --error_threshold
許容される失敗の割合 整数 (integer) mini_batch_error_threshold に似ていますが、カウントではなく失敗したミニバッチの割合を使います。 [0, 100] 100 該当なし --allowed_failed_percent
オーバーヘッド タイムアウト 整数 (integer) 各ミニバッチの初期化のタイムアウト (秒)。 たとえば、ミニバッチのデータを読み込んで run() 関数に渡します。 (0, 259200] 600 該当なし --task_overhead_timeout
進行状況更新タイムアウト 整数 (integer) ミニバッチの実行の進行状況を監視するためのタイムアウト (秒)。 このタイムアウト設定内に進行状況の更新を受け取らない場合、並列ジョブは失敗としてマークされます。 (0, 259200] 他の設定によって動的に計算されます。 該当なし --progress_update_timeout
最初のタスク作成のタイムアウト 整数 (integer) ジョブの開始から最初のミニバッチの実行までの時間を監視するためのタイムアウト (秒)。 (0, 259200] 600 該当なし --first_task_creation_timeout
ログ レベル string ユーザー ログ ファイルにダンプされるログのレベルを定義します。 INFO、WARNING、または DEBUG INFO logging_level 該当なし
行の追加先 string ミニバッチの各実行から戻されたすべての値を集約して、このファイルに出力します。 ${{outputs.<output_name>}} というを使って、並列ジョブの出力の 1 つを参照できます task.append_row_to 該当なし
ログを親にコピーする string ジョブの進行状況、概要、ログを親パイプライン ジョブにコピーするかどうかを示すブール値オプション。 True または False False 該当なし --copy_logs_to_parent
リソース モニターの間隔 整数 (integer) ノード リソースの使用状況 (CPU、メモリなど) を "logs/sys/perf" パスの下のログ フォルダーにダンプする時間間隔 (秒)。

注: リソース ログを頻繁にダンプすると、ミニバッチの実行速度が若干遅くなります。 リソース使用状況のダンプを停止するには、この値を "0" に設定します。
[0, int.max] 600 該当なし --resource_monitor_interval

これらの設定を更新するサンプル コード:

適用対象: Azure CLI ML 拡張機能 v2 (現行)

batch_prediction:
  type: parallel
  compute: azureml:cpu-cluster
  inputs:
    input_data: 
      type: mltable
      path: ./neural-iris-mltable
      mode: direct
    score_model: 
      type: uri_folder
      path: ./iris-model
      mode: download
  outputs:
    job_output_file:
      type: uri_file
      mode: rw_mount

  input_data: ${{inputs.input_data}}
  mini_batch_size: "10kb"
  resources:
      instance_count: 2
  max_concurrency_per_instance: 2

  logging_level: "DEBUG"
  mini_batch_error_threshold: 5
  retry_settings:
    max_retries: 2
    timeout: 60

  task:
    type: run_function
    code: "./script"
    entry_script: iris_prediction.py
    environment:
      name: "prs-env"
      version: 1
      image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
      conda_file: ./environment/environment_parallel.yml
    program_arguments: >-
      --model ${{inputs.score_model}}
      --error_threshold 5
      --allowed_failed_percent 30
      --task_overhead_timeout 1200
      --progress_update_timeout 600
      --first_task_creation_timeout 600
      --copy_logs_to_parent True
      --resource_monitor_interva 20
    append_row_to: ${{outputs.job_output_file}}

パイプラインで並列ジョブを作成する

適用対象: Azure CLI ML 拡張機能 v2 (現行)

パイプライン ジョブを使用して並列ジョブをインラインで作成できます。

$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline

display_name: iris-batch-prediction-using-parallel
description: The hello world pipeline job with inline parallel job
tags:
  tag: tagvalue
  owner: sdkteam

settings:
  default_compute: azureml:cpu-cluster

jobs:
  batch_prediction:
    type: parallel
    compute: azureml:cpu-cluster
    inputs:
      input_data: 
        type: mltable
        path: ./neural-iris-mltable
        mode: direct
      score_model: 
        type: uri_folder
        path: ./iris-model
        mode: download
    outputs:
      job_output_file:
        type: uri_file
        mode: rw_mount

    input_data: ${{inputs.input_data}}
    mini_batch_size: "10kb"
    resources:
        instance_count: 2
    max_concurrency_per_instance: 2

    logging_level: "DEBUG"
    mini_batch_error_threshold: 5
    retry_settings:
      max_retries: 2
      timeout: 60

    task:
      type: run_function
      code: "./script"
      entry_script: iris_prediction.py
      environment:
        name: "prs-env"
        version: 1
        image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
        conda_file: ./environment/environment_parallel.yml
      program_arguments: >-
        --model ${{inputs.score_model}}
        --error_threshold 5
        --allowed_failed_percent 30
        --task_overhead_timeout 1200
        --progress_update_timeout 600
        --first_task_creation_timeout 600
        --copy_logs_to_parent True
        --resource_monitor_interva 20
      append_row_to: ${{outputs.job_output_file}}

パイプライン ジョブを送信し、スタジオの UI で並列ステップを確認する

適用対象: Azure CLI ML 拡張機能 v2 (現行)

CLI コマンドを使うことにより、並列ステップでパイプライン ジョブを送信できます。

az ml job create --file pipeline.yml

パイプライン ジョブを送信すると、SDK または CLI のウィジェットに、スタジオ UI への Web URL リンクが表示されます。 このリンクを使用すると、既定ではパイプライン グラフ ビューにアクセスできます。 並列ステップをダブルクリックして、並列ジョブの右側のパネルを開きます。

並列ジョブの設定を確認するには、[パラメーター] タブに移動し、[実行設定] を展開して、[並列] セクションを調べます。

並列ジョブの設定を示す Azure Machine Learning スタジオの [ジョブ] タブのスクリーンショット。

並列ジョブの失敗をデバッグするには、[出力とログ] タブに移動し、左側の出力ディレクトリの logs フォルダーを展開し、job_result.txt を調べて、並列ジョブが失敗した理由を把握します。 並列ジョブのログの構造について詳しくは、同じフォルダーにある readme.txt をご覧ください。

並列ジョブの結果を示す Azure Machine Learning スタジオの [ジョブ] タブのスクリーンショット。

パイプラインでの並列ジョブの例

次のステップ