並列実行ステップを SDK v2 にアップグレードする

SDK v2 では、"並列実行ステップ" は parallel jobとしてジョブの概念に統合されます。 ユーザーは、並列ジョブを使うことで、同じターゲットを維持し、繰り返されるタスクを強力なマルチノード コンピューティング クラスターに分散させて、ジョブの実行を高速化できます。 並列実行ステップの上に、v2 並列ジョブには追加のベネフィットがあります:

  • 柔軟なインターフェイス。これにより、ユーザーは並列ジョブの複数のカスタム入力と出力を定義できます。 他のステップに接続して、エントリ スクリプトでコンテンツを使用または管理できます
  • v2 data asset の概念を使用して入力としてDatasetを置き換える入力スキーマを簡略化します。 ローカル ファイルまたは BLOB ディレクトリ URI を並列ジョブへの入力として簡単に使用できます。
  • より強力なフィーチャーは、v2 並列ジョブでのみ開発されています。 たとえば、失敗または取り消された並列ジョブを再開し、成功した結果を再利用して重複する作業量を節約することで、失敗または未処理のミニバッチのプロセス処理を続行します。

現在の sdk v1 並列実行ステップを v2 にアップグレードするには、次の手順を実行する必要があります

注: ユーザー 入力スクリプト は、v1 並列実行ステップと v2 並列ジョブの間で互換性があります。 そのため、並列実行ジョブを移行するときに、同じ entry_script.py を使用し続けることができます。

この記事では、SDK v1 と SDK v2 のシナリオの比較を示します。 次の例では、パイプライン ジョブ内の入力データを予測する並列ジョブをビルドします。 並列ジョブをビルドする方法と、SDK v1 と SDK v2 の両方のパイプライン ジョブでそれを使用する方法について説明します。

前提条件

並列ステップの作成

  • SDK v1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • SDK v2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

パイプラインで並列ステップを使用する

  • SDK v1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • SDK v2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

SDK v1 と SDK v2 の主要機能のマッピング

SDK v1 の機能 SDK v2 での大まかなマッピング
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig 出力
dataset as_mount 入力

並列ジョブの構成と設定のマッピング

SDK v1 SDK v2 説明
ParallelRunConfig.environment parallel_run_function.task.environment トレーニング ジョブが実行される環境。
ParallelRunConfig.entry_script parallel_run_function.task.entry_script 複数のノードで並列に実行されるユーザー スクリプト。
ParallelRunConfig.error_threshold parallel_run_function.error_threshold この並列ジョブで無視できる失敗したミニバッチの数。 失敗したミニバッチの数がこのしきい値を超えた場合、並列ジョブは失敗としてマークされます。

既定値は "-1" で、並列ジョブの間に失敗したすべてのミニバッチを無視することを意味します。
ParallelRunConfig.output_action parallel_run_function.append_row_to ミニバッチの各実行から戻されたすべての値を集約して、このファイルに出力します。 ${{outputs.<output_name>}} というを使って、並列ジョブの出力の 1 つを参照できます
ParallelRunConfig.node_count parallel_run_function.instance_count コンピューティング 先で使用されるインスタンスまたはノードの数 (省略可能)。 既定値は 1 です。
ParallelRunConfig.process_count_per_node parallel_run_function.max_concurrency_per_instance 各コンピューティング インスタンスが持つ最大並列処理。
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size 入力を分割する各ミニバッチのサイズを定義します。

input_dataがフォルダーまたはファイルのセットである場合、この番号は各ミニバッチのファイル数を定義します。 たとえば、10、100 などです。

input_dataがmltableからの表形式のデータ の場合、この数値は各ミニバッチの近接物理サイズを定義します。 既定の単位は Byte で、値は 100 kb、100 mb などの文字列を受け取ります。
ParallelRunConfig.source_directory parallel_run_function.task.code ソース コードを指すローカルパスまたはリモート パス。
ParallelRunConfig.description parallel_run_function.description 並行プログラムのわかりやすい説明です
ParallelRunConfig.logging_level parallel_run_function.logging_level ログ レベル名の文字列。'logging' で定義されます。 指定可能な値は、'WARNING'、'INFO'、'DEBUG' です。 (省略可能。既定値は 'INFO' です。)この値は PipelineParameter を使用して設定できます。
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout カスタム run() 関数の実行での秒単位のタイムアウト。 実行時間がこのしきい値を超えた場合、そのミニバッチは中止され、失敗とマークされて再試行がトリガーされます。
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_retries ミニバッチが失敗またはタイムアウトしたときの再試行回数。 すべての再試行が失敗した場合、そのミニバッチは失敗とマークされて mini_batch_error_しきい値 の計算でカウントされます。
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to append_row_to設定と 組み合わせる。
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold この並列ジョブで無視できる失敗したミニバッチの数。 失敗したミニバッチの数がこのしきい値を超えた場合、並列ジョブは失敗としてマークされます。

既定値は "-1" で、並列ジョブの間に失敗したすべてのミニバッチを無視することを意味します。
ParallelRunConfig.allowed_failed_percent parallel_run_function.task.program_arguments set
--allowed_failed_percent
"allowed_failed_count"に似ているが、この設定はミニバッチの失敗アカウントの代わりに失敗したミニバッチのパーセントを使用します。

この設定の範囲は [0, 100] です。 既定値は "100" で、並列ジョブの間に失敗したすべてのミニバッチを無視することを意味します。
ParallelRunConfig.partition_keys 開発中。
ParallelRunConfig.environment_variables parallel_run_function.environment_variables 環境変数の名前と値のディクショナリ。 これらの環境変数は、ユーザー スクリプトが実行されるプロセスで設定されます。
ParallelRunStep.name parallel_run_function.name 作成された並列ジョブまたはコンポーネントの名前。
ParallelRunStep.inputs parallel_run_function.inputs この並列で使用される入力のディクテーション。
-- parallel_run_function.input_data 並列で分割および処理するデータを宣言する
ParallelRunStep.output parallel_run_function.outputs この並列ジョブの出力。
ParallelRunStep.side_inputs parallel_run_function.inputs inputsと共に定義されます。
ParallelRunStep.arguments parallel_run_function.task.program_arguments 並列タスクの引数。
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic 並列が同じ入力を指定して同じ出力を返すかどうかを指定します。

次のステップ

詳しくは、こちらのドキュメントをご覧ください。