將平行執行步驟升級至 SDK v2

在 SDK v2 中,「平行執行步驟」會以 parallel job 合併成作業概念。 平行作業會保留相同的目標,讓使用者能夠在功能強大的多節點計算叢集上發佈重複的工作,以加速其作業執行。 在平行執行步驟上,v2 平行作業提供額外的優點:

  • 彈性介面,可讓使用者為平行作業定義多個自訂輸入和輸出。 您可以將這些介面與其他步驟連線,以取用或管理輸入腳本中的內容
  • 簡化輸入結構描述,其使用 v2 Dataset 概念取代 data asset 做為輸入。 您可以輕鬆地使用本機檔案或 Blob 目錄 URI 做為平行作業的輸入。
  • 更強大的功能只會在 v2 平行作業中開發。 例如,藉由重複使用成功的結果來節省重複的工作,以繼續處理失敗或未處理的迷你批次,以繼續失敗/取消的平行作業。

若要將目前的 sdk v1 平行執行步驟升級至 v2,您必須

  • 使用 parallel_run_function,以藉由在 v1 中取代 ParallelRunConfigParallelRunStep 來建立平行作業。
  • 將您的 v1 管線升級至 v2。 然後叫用 v2 平行作業做為 v2 管線中的步驟。 如需管線升級的詳細資料,請參閱 如何將管線從 v1 升級至 v2

注意:使用者輸入腳本在 v1 平行執行步驟與 v2 平行作業之間相容。 因此,當您升級平行執行作業時,您可以繼續使用相同的 entry_script.py。

本文提供在 SDK v1 和 SDK v2 中案例的比較。 在下列範例中,我們將建置平行作業,以預測管線作業中的輸入資料。 您將了解如何建置平行作業,以及如何在 SDK v1 和 SDK v2 的管線作業中使用。

必要條件

建立平行步驟

  • SDK v1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • SDK v2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

在管線中使用平行步驟

  • SDK v1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • SDK v2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

SDK v1 和 SDK v2 中主要功能的對應

SDK v1 中的功能 SDK v2 中的粗略對應
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig 輸出
dataset as_mount 輸入

平行作業組態和設定對應

SDK v1 SDK v2 描述
ParallelRunConfig.environment parallel_run_function.task.environment 定型作業將在其中執行的環境。
ParallelRunConfig.entry_script parallel_run_function.task.entry_script 將會在多個節點上平行執行的使用者指令碼。
ParallelRunConfig.error_threshold parallel_run_function.error_threshold 在此平行作業中可忽略的失敗迷你批次數目。 如果失敗的迷你批次計數高於此臨界值,平行作業將會標示為失敗。

「-1」是預設數位,這表示在平行作業期間忽略所有失敗的迷你批次。
ParallelRunConfig.output_action parallel_run_function.append_row_to 彙總每個迷你批次回合的所有傳回,並將其輸出到此檔案中。 可以使用運算式 ${{outputs.<output_name>}} 參考其中一個平行作業輸出
ParallelRunConfig.node_count parallel_run_function.instance_count 計算目標所使用的選用執行個體或節點數目。 預設值為 1。
ParallelRunConfig.process_count_per_node parallel_run_function.max_concurrency_per_instance 每個計算執行個體都有的最大平行處理原則。
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size 定義每個迷你批次的大小以分割輸入。

如果 input_data 是資料夾或一組檔案,則該數字會定義每個迷你批次的檔案計數。 例如:10, 100。

如果 input_data 是來自 mltable 的表格式資料,則該數字會定義每個迷你批次的同位實體大小。 預設單位為位元組,此值可以接受字串,例如 100 kb、100 mb。
ParallelRunConfig.source_directory parallel_run_function.task.code 指向原始程式碼的本機或遠端路徑。
ParallelRunConfig.description parallel_run_function.description 平行的易記描述
ParallelRunConfig.logging_level parallel_run_function.logging_level 記錄層級名稱的字串,定義於 'logging' 中。 可能的值為 'WARNING'、'INFO'和 'DEBUG'。 (選用,預設值為 'INFO'。)此值可透過 PipelineParameter 設定。
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout 以秒為單位定義執行自訂 run() 函式的逾時。 如果執行時間高於此臨界值,則會中止迷你批次,並標示為失敗的迷你批次以觸發重試。
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_retries 迷你批次失敗或逾時時的重試次數。 如果所有重試都失敗,迷你批次將會標示為無法由 mini_batch_error_threshold 計算進行計算。
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to 結合 append_row_to 設定。
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold 在此平行作業中可忽略的失敗迷你批次數目。 如果失敗的迷你批次計數高於此臨界值,平行作業將會標示為失敗。

「-1」是預設數位,這表示在平行作業期間忽略所有失敗的迷你批次。
ParallelRunConfig.allowed_failed_percent parallel_run_function.task.program_arguments set
--allowed_failed_percent
類似於「allowed_failed_count」,但此設定會使用失敗的迷你批次百分比,而不是迷你批次失敗計數。

此設定的範圍為 [0,100]。 「100」是預設數字,這表示在平行作業期間忽略所有失敗的迷你批次。
ParallelRunConfig.partition_keys 開發中。
ParallelRunConfig.environment_variables parallel_run_function.environment_variables 環境變數名稱和值的字典。 這些環境變數會在執行使用者指令碼的程序上設定。
ParallelRunStep.name parallel_run_function.name 建立的平行作業或元件名稱。
ParallelRunStep.inputs parallel_run_function.inputs 這個平行使用的輸入 dict。
-- parallel_run_function.input_data 宣告要以平行方式分割和處理的資料
ParallelRunStep.output parallel_run_function.outputs 此平行作業的輸出。
ParallelRunStep.side_inputs parallel_run_function.inputs inputs 一起定義。
ParallelRunStep.arguments parallel_run_function.task.program_arguments 平行工作的引數。
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic 指定平行是否會在指定的相同輸入下傳回相同的輸出。

下一步

如需詳細資訊,請參閱這裡的文件: