パイプラインで並列ジョブを使用する方法 (V2)
適用対象:Azure CLI ml extension v2 (現行)Python SDK azure-ai-ml v2 (現行)
ユーザーは、並列ジョブを使うことで、繰り返されるタスクを強力なマルチノード コンピューティング クラスターに分散させて、ジョブの実行を高速化できます。 たとえば、画像の大きなセットで物体検出モデルを実行しているシナリオを考えます。 Azure Machine Learning 並列ジョブを使うと、画像を簡単に分散させて、特定のコンピューティング クラスターでカスタム コードを並列に実行できます。 並列化により、時間コストが大幅に削減される可能性があります。 また、Azure Machine Learning 並列ジョブを使うと、プロセスを簡素化して自動化し、いっそう効率的にできます。
前提条件
Azure Machine Learning 並列ジョブは、パイプライン ジョブのステップの 1 つとしてのみ使用できます。 そのため、パイプラインの使用について理解しておくことが重要です。 Azure Machine Learning パイプラインの詳細については、以下の記事を参照してください。
- Azure Machine Learning パイプラインとは何かについて理解している。
- CLI v2 と SDK v2 で Azure Machine Learning パイプラインを使用する方法を理解します。
並列ジョブが必要な理由
現実の ML エンジニアは、常に、トレーニングまたは推論タスクに関するスケールの要件を抱えています。 たとえば、データ科学者が売上予測モデルをトレーニングするための 1 つのスクリプトを提供したら、ML エンジニアはこのトレーニング タスクを個々の各ストアに適用する必要があります。 このスケールアウト プロセスでは、次のような課題があります。
- 長い実行時間によって発生する遅延圧力。
- タスクの続行を維持するために、予期しない問題を処理するための手動による介入。
Azure Machine Learning 並列ジョブの核心的価値は、1 つのシリアル タスクをミニバッチに分割し、それらのミニバッチを複数のコンピューティングにディスパッチして並列に実行することです。 並列ジョブを使うと、次のことができます。
- エンド ツー エンドの実行時間を大幅に短縮します。
- Azure Machine Learning 並列ジョブの自動エラー処理設定を使います。
次の場合は、Azure Machine Learning 並列ジョブの使用を検討する必要があります。
- パーティション分割されたデータを使って多数のモデルをトレーニングする予定である。
- 大規模なバッチ推論タスクを高速化したい。
並列ジョブの準備
他の種類のジョブとは異なり、並列ジョブには準備が必要です。 並列ジョブの作成を準備するには、以下のセクションのようにします。
分散させる入力とデータ分割の設定を宣言する
並列ジョブでは、1 つの主要な入力データのみを分割して並列に処理する必要があります。 主要な入力データには、表形式データまたはファイルのセットを使用できます。 入力の種類が異なると、データ分割の方法が異なる場合があります。
次の表では、入力データとデータ分割の方法の関係を示します。
データ形式 | Azure Machine Learning の入力の種類 | Azure Machine Learning の入力モード | データ分割の方法 |
---|---|---|---|
ファイル一覧 | mltable またはuri_folder |
ro_mount または download |
サイズ別 (ファイル数) パーティション別 |
表形式データ" のような式に名前をバインドできます。 | mltable |
直接 | サイズ別 (推定物理サイズ) パーティション別 |
主要な入力データは、並列ジョブの YAML または Python SDK の input_data
属性を使って宣言できます。 また、${{inputs.<input name>}}
を使って、並列ジョブの定義された inputs
の 1 つとそれをバインドできます。 次に、さまざまな属性を入力して、主要な入力のデータ分割方法を定義する必要があります。
データ分割の方法 | 属性名 | 属性の型 | ジョブの例 |
---|---|---|---|
サイズ別 | mini_batch_size | string | アヤメのバッチ予測 |
パーティション別 | partition_keys | 文字列の一覧 | オレンジ ジュースの売上予測 |
適用対象: Azure CLI ML 拡張機能 v2 (現行)
batch_prediction:
type: parallel
compute: azureml:cpu-cluster
inputs:
input_data:
type: mltable
path: ./neural-iris-mltable
mode: direct
score_model:
type: uri_folder
path: ./iris-model
mode: download
outputs:
job_output_file:
type: uri_file
mode: rw_mount
input_data: ${{inputs.input_data}}
mini_batch_size: "10kb"
resources:
instance_count: 2
max_concurrency_per_instance: 2
logging_level: "DEBUG"
mini_batch_error_threshold: 5
retry_settings:
max_retries: 2
timeout: 60
データ分割設定を定義したら、次の 2 つの属性を入力して、並列化のリソースの数を構成できます。
属性名 | タイプ | 説明 | 既定値 |
---|---|---|---|
instance_count |
整数 (integer) | ジョブに使用するノードの数。 | 1 |
max_concurrency_per_instance |
整数 (integer) | 各ノードのプロセッサの数。 | GPU コンピューティングの場合、既定値は 1 です。 CPU コンピューティングの場合、既定値はコアの数です。 |
これら 2 つの属性は、指定したコンピューティング クラスターと連携します。
2 つの属性を設定するサンプル コード:
適用対象: Azure CLI ML 拡張機能 v2 (現行)
batch_prediction:
type: parallel
compute: azureml:cpu-cluster
inputs:
input_data:
type: mltable
path: ./neural-iris-mltable
mode: direct
score_model:
type: uri_folder
path: ./iris-model
mode: download
outputs:
job_output_file:
type: uri_file
mode: rw_mount
input_data: ${{inputs.input_data}}
mini_batch_size: "10kb"
resources:
instance_count: 2
max_concurrency_per_instance: 2
logging_level: "DEBUG"
mini_batch_error_threshold: 5
retry_settings:
max_retries: 2
timeout: 60
注意
主要な入力データとして表形式の mltable
を使う場合は、transformations - read_delimited
セクションが指定された MLTABLE 仕様ファイルを特定のパスの下に置く必要があります。 その他の例については、MLTABLE データ資産の作成に関する記事をご覧ください。
エントリ スクリプトで定義済みの関数を実装する
エントリ スクリプトは 1 つの Python ファイルであり、ユーザーはカスタム コードを使って 3 つの定義済み関数を実装する必要があります。 Azure Machine Learning 並列ジョブは、次の図のように各プロセッサでそれらを実行します。
関数名 | 必須 | Description | 入力 | 戻り値 |
---|---|---|---|---|
Init() | Y | この関数は、ミニバッチの実行を始める前の一般的な準備に使います。 たとえば、これを使って、モデルをグローバル オブジェクトに読み込みます。 | -- | -- |
Run(mini_batch) | Y | mini_batches 用のメイン実行ロジックを実装します。 | mini_batch: 入力データが表形式データの場合は、Pandas データフレーム。 入力データがディレクトリの場合は、ファイル パスのリスト。 |
データフレーム、リスト、またはタプル。 |
Shutdown() | N | コンピューティングをプールに戻す前にカスタム クリーンアップを実行する省略可能な関数。 | -- | -- |
詳しくは、次のエントリ スクリプトの例を確認してください。
エントリ スクリプトの準備ができたら、次の 2 つの属性を設定して、並列ジョブで使用できます。
属性名 | タイプ | 説明 | 既定値 |
---|---|---|---|
code |
string | アップロードしてジョブに使用するソース コード ディレクトリへのローカル パス。 | |
entry_script |
string | 定義済みの並列関数の実装を含む Python ファイル。 |
2 つの属性を設定するサンプル コード:
適用対象: Azure CLI ML 拡張機能 v2 (現行)
batch_prediction:
type: parallel
compute: azureml:cpu-cluster
inputs:
input_data:
type: mltable
path: ./neural-iris-mltable
mode: direct
score_model:
type: uri_folder
path: ./iris-model
mode: download
outputs:
job_output_file:
type: uri_file
mode: rw_mount
input_data: ${{inputs.input_data}}
mini_batch_size: "10kb"
resources:
instance_count: 2
max_concurrency_per_instance: 2
logging_level: "DEBUG"
mini_batch_error_threshold: 5
retry_settings:
max_retries: 2
timeout: 60
task:
type: run_function
code: "./script"
entry_script: iris_prediction.py
environment:
name: "prs-env"
version: 1
image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
conda_file: ./environment/environment_parallel.yml
program_arguments: >-
--model ${{inputs.score_model}}
--error_threshold 5
--allowed_failed_percent 30
--task_overhead_timeout 1200
--progress_update_timeout 600
--first_task_creation_timeout 600
--copy_logs_to_parent True
--resource_monitor_interva 20
append_row_to: ${{outputs.job_output_file}}
重要
Run(mini_batch) 関数は、データフレーム、リスト、またはタプル項目のいずれかを返す必要があります。 並列ジョブでは、その戻り値の数を使って、そのミニバッチで成功した項目の数を測定します。 すべての項目がこのミニバッチで適切に処理されている場合、理想的にはミニバッチ数が戻されたリストの数と等しくなるばずです。
重要
Init() または Run(mini_batch) 関数の引数を解析する場合は、例外を回避するために "parse_args" の代わりに "parse_known_args" を使います。 引数パーサーを含むエントリ スクリプトについては、、iris_score の例をご覧ください。
重要
主要な入力データとして mltable
を使用する場合は、環境に 'mltable' ライブラリをインストールする必要があります。 この conda ファイルの例の 9 行目を参照してください。
自動化の設定を検討する
Azure Machine Learning 並列ジョブでは、手動による介入なしにジョブを自動的に制御するための多数の設定が公開されています。 詳しくは、次の表をご覧ください。
キー | Type | 説明 | 使用できる値 | 既定値 | 属性で設定する | プログラム引数で設定する |
---|---|---|---|---|---|---|
ミニバッチ エラーしきい値 | 整数 (integer) | この並列ジョブで無視できる失敗したミニバッチの数を定義します。 失敗したミニバッチの数がこのしきい値を超えた場合、並列ジョブは失敗としてマークされます。 ミニバッチは、次の場合に失敗としてマークされます。 - run() からの戻り値の数がミニバッチの入力数未満です。 - カスタム run() コードで例外をキャッチしました。 既定値は "-1" で、並列ジョブの間に失敗したすべてのミニバッチを無視することを意味します。 |
[-1, int.max] | -1 | mini_batch_error_threshold | 該当なし |
ミニバッチの最大再試行回数 | 整数 (integer) | ミニバッチが失敗またはタイムアウトしたときの再試行回数を定義します。 すべての再試行が失敗した場合、そのミニバッチは失敗とマークされて mini_batch_error_threshold の計算でカウントされます。 |
[0, int.max] | 2 | retry_settings.max_retries | 該当なし |
ミニバッチ タイムアウト | 整数 (integer) | カスタム run() 関数の実行でのタイムアウトを秒単位で定義します。 実行時間がこのしきい値を超えた場合、そのミニバッチは中止され、失敗とマークされて再試行がトリガーされます。 | (0, 259200] | 60 | retry_settings.timeout | 該当なし |
項目エラーしきい値 | 整数 (integer) | 失敗した項目数のしきい値。 失敗した項目の数は、入力数と各ミニバッチから返された数の間のギャップによってカウントされます。 失敗した項目の合計がこのしきい値を超えた場合、並列ジョブは失敗としてマークされます。 注: 既定値は "-1" で、並列ジョブの間の失敗をすべて無視することを意味します。 |
[-1, int.max] | -1 | N/A | --error_threshold |
許容される失敗の割合 | 整数 (integer) | mini_batch_error_threshold に似ていますが、カウントではなく失敗したミニバッチの割合を使います。 |
[0, 100] | 100 | 該当なし | --allowed_failed_percent |
オーバーヘッド タイムアウト | 整数 (integer) | 各ミニバッチの初期化のタイムアウト (秒)。 たとえば、ミニバッチのデータを読み込んで run() 関数に渡します。 | (0, 259200] | 600 | 該当なし | --task_overhead_timeout |
進行状況更新タイムアウト | 整数 (integer) | ミニバッチの実行の進行状況を監視するためのタイムアウト (秒)。 このタイムアウト設定内に進行状況の更新を受け取らない場合、並列ジョブは失敗としてマークされます。 | (0, 259200] | 他の設定によって動的に計算されます。 | 該当なし | --progress_update_timeout |
最初のタスク作成のタイムアウト | 整数 (integer) | ジョブの開始から最初のミニバッチの実行までの時間を監視するためのタイムアウト (秒)。 | (0, 259200] | 600 | 該当なし | --first_task_creation_timeout |
ログ レベル | string | ユーザー ログ ファイルにダンプされるログのレベルを定義します。 | INFO、WARNING、または DEBUG | INFO | logging_level | 該当なし |
行の追加先 | string | ミニバッチの各実行から戻されたすべての値を集約して、このファイルに出力します。 ${{outputs.<output_name>}} というを使って、並列ジョブの出力の 1 つを参照できます | task.append_row_to | 該当なし | ||
ログを親にコピーする | string | ジョブの進行状況、概要、ログを親パイプライン ジョブにコピーするかどうかを示すブール値オプション。 | True または False | False | 該当なし | --copy_logs_to_parent |
リソース モニターの間隔 | 整数 (integer) | ノード リソースの使用状況 (CPU、メモリなど) を "logs/sys/perf" パスの下のログ フォルダーにダンプする時間間隔 (秒)。 注: リソース ログを頻繁にダンプすると、ミニバッチの実行速度が若干遅くなります。 リソース使用状況のダンプを停止するには、この値を "0" に設定します。 |
[0, int.max] | 600 | 該当なし | --resource_monitor_interval |
これらの設定を更新するサンプル コード:
適用対象: Azure CLI ML 拡張機能 v2 (現行)
batch_prediction:
type: parallel
compute: azureml:cpu-cluster
inputs:
input_data:
type: mltable
path: ./neural-iris-mltable
mode: direct
score_model:
type: uri_folder
path: ./iris-model
mode: download
outputs:
job_output_file:
type: uri_file
mode: rw_mount
input_data: ${{inputs.input_data}}
mini_batch_size: "10kb"
resources:
instance_count: 2
max_concurrency_per_instance: 2
logging_level: "DEBUG"
mini_batch_error_threshold: 5
retry_settings:
max_retries: 2
timeout: 60
task:
type: run_function
code: "./script"
entry_script: iris_prediction.py
environment:
name: "prs-env"
version: 1
image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
conda_file: ./environment/environment_parallel.yml
program_arguments: >-
--model ${{inputs.score_model}}
--error_threshold 5
--allowed_failed_percent 30
--task_overhead_timeout 1200
--progress_update_timeout 600
--first_task_creation_timeout 600
--copy_logs_to_parent True
--resource_monitor_interva 20
append_row_to: ${{outputs.job_output_file}}
パイプラインで並列ジョブを作成する
適用対象: Azure CLI ML 拡張機能 v2 (現行)
パイプライン ジョブを使用して並列ジョブをインラインで作成できます。
$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline
display_name: iris-batch-prediction-using-parallel
description: The hello world pipeline job with inline parallel job
tags:
tag: tagvalue
owner: sdkteam
settings:
default_compute: azureml:cpu-cluster
jobs:
batch_prediction:
type: parallel
compute: azureml:cpu-cluster
inputs:
input_data:
type: mltable
path: ./neural-iris-mltable
mode: direct
score_model:
type: uri_folder
path: ./iris-model
mode: download
outputs:
job_output_file:
type: uri_file
mode: rw_mount
input_data: ${{inputs.input_data}}
mini_batch_size: "10kb"
resources:
instance_count: 2
max_concurrency_per_instance: 2
logging_level: "DEBUG"
mini_batch_error_threshold: 5
retry_settings:
max_retries: 2
timeout: 60
task:
type: run_function
code: "./script"
entry_script: iris_prediction.py
environment:
name: "prs-env"
version: 1
image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
conda_file: ./environment/environment_parallel.yml
program_arguments: >-
--model ${{inputs.score_model}}
--error_threshold 5
--allowed_failed_percent 30
--task_overhead_timeout 1200
--progress_update_timeout 600
--first_task_creation_timeout 600
--copy_logs_to_parent True
--resource_monitor_interva 20
append_row_to: ${{outputs.job_output_file}}
パイプライン ジョブを送信し、スタジオの UI で並列ステップを確認する
適用対象: Azure CLI ML 拡張機能 v2 (現行)
CLI コマンドを使うことにより、並列ステップでパイプライン ジョブを送信できます。
az ml job create --file pipeline.yml
パイプライン ジョブを送信すると、SDK または CLI のウィジェットに、スタジオ UI への Web URL リンクが表示されます。 このリンクを使用すると、既定ではパイプライン グラフ ビューにアクセスできます。 並列ステップをダブルクリックして、並列ジョブの右側のパネルを開きます。
並列ジョブの設定を確認するには、[パラメーター] タブに移動し、[実行設定] を展開して、[並列] セクションを調べます。
並列ジョブの失敗をデバッグするには、[出力とログ] タブに移動し、左側の出力ディレクトリの logs フォルダーを展開し、job_result.txt を調べて、並列ジョブが失敗した理由を把握します。 並列ジョブのログの構造について詳しくは、同じフォルダーにある readme.txt をご覧ください。
パイプラインでの並列ジョブの例
次のステップ
- 並列ジョブの YAML スキーマについて詳しくは、並列ジョブの YAML リファレンスをご覧ください。
- データを MLTABLE にオンボードする方法については、MLTABLE データ資産の作成に関する記事をご覧ください。
- パイプラインを定期的にトリガーする方法については、パイプラインのスケジュール方法に関する記事をご覧ください。
フィードバック
https://aka.ms/ContentUserFeedback」を参照してください。
以下は間もなく提供いたします。2024 年を通じて、コンテンツのフィードバック メカニズムとして GitHub の issue を段階的に廃止し、新しいフィードバック システムに置き換えます。 詳細については、「フィードバックの送信と表示