Edit

Share via


Use parallel jobs in pipelines

APPLIES TO: Azure CLI ml extension v2 (current) Python SDK azure-ai-ml v2 (current)

This article explains how to use the CLI v2 and Python SDK v2 to run parallel jobs in Azure Machine Learning pipelines. Parallel jobs accelerate job execution by distributing repeated tasks on powerful multinode compute clusters.

Machine learning engineers always have scale requirements on their training or inferencing tasks. For example, when a data scientist provides a single script to train a sales prediction model, machine learning engineers need to apply this training task to each individual data store. Challenges of this scale-out process include long execution times that cause delays, and unexpected issues that require manual intervention to keep the task running.

The core job of Azure Machine Learning parallelization is to split a single serial task into mini-batches and dispatch those mini-batches to multiple computes to execute in parallel. Parallel jobs significantly reduce end-to-end execution time and also handle errors automatically. Consider using Azure Machine Learning Parallel job to train many models on top of your partitioned data or to accelerate your large-scale batch inferencing tasks.

For example, in a scenario where you're running an object detection model on a large set of images, Azure Machine Learning parallel jobs let you easily distribute your images to run custom code in parallel on a specific compute cluster. Parallelization can significantly reduce time cost. Azure Machine Learning parallel jobs can also simplify and automate your process to make it more efficient.

Prerequisites

Create and run a pipeline with a parallel job step

An Azure Machine Learning parallel job can be used only as a step in a pipeline job.

Prepare for parallelization

This parallel job step requires preparation. You need an entry script that implements the predefined functions. You also need to set attributes in your parallel job definition that:

  • Define and bind your input data.
  • Set the data division method.
  • Configure your compute resources.
  • Call the entry script.

The following sections describe how to prepare the parallel job.

Declare the inputs and data division setting

A parallel job requires one major input to be split and processed in parallel. The major input data format can be either tabular data or a list of files.

Different data formats have different input types, input modes, and data division methods. The following table describes the options:

Data format Input type Input mode Data division method
File list mltable or uri_folder ro_mount or download By size (number of files) or by partition
Tabular data mltable direct By size (estimated physical size) or by partition

Note

If you use tabular mltable as your major input data, you need to:

  • Install the mltable library in your environment, as in line 9 of this conda file.
  • Have a MLTable specification file under your specified path with the transformations: - read_delimited: section filled out. For examples, see Create and manage data assets.

You can declare your major input data with the input_data attribute in the parallel job YAML or Python, and bind the data with the defined input of your parallel job by using ${{inputs.<input name>}}. Then you define the data division attribute for your major input depending on your data division method.

Data division method Attribute name Attribute type Job example
By size mini_batch_size string Iris batch prediction
By partition partition_keys list of strings Orange juice sales prediction

Configure the compute resources for parallelization

After you define the data division attribute, configure the compute resources for your parallelization by setting the instance_count and max_concurrency_per_instance attributes.

Attribute name Type Description Default value
instance_count integer The number of nodes to use for the job. 1
max_concurrency_per_instance integer The number of processors on each node. For a GPU compute: 1. For a CPU compute: number of cores.

These attributes work together with your specified compute cluster, as shown in the following diagram:

Diagram showing how distributed data works in parallel job.

Call the entry script

The entry script is a single Python file that implements the following three predefined functions with custom code.

Function name Required Description Input Return
Init() Y Common preparation before starting to run mini-batches. For example, use this function to load the model into a global object. -- --
Run(mini_batch) Y Implements main execution logic for mini-batches. mini_batch is pandas dataframe if input data is a tabular data, or file path list if input data is a directory. Dataframe, list, or tuple.
Shutdown() N Optional function to do custom cleanups before returning the compute to the pool. -- --

Important

To avoid exceptions when parsing arguments in Init() or Run(mini_batch) functions, use parse_known_args instead of parse_args. See the iris_score example for an entry script with argument parser.

Important

The Run(mini_batch) function requires a return of either a dataframe, list, or tuple item. The parallel job uses the count of that return to measure the success items under that mini-batch. Mini-batch count should be equal to the return list count if all items have processed.

The parallel job executes the functions in each processor, as shown in the following diagram.

Diagram showing how entry script works in parallel job.

See the following entry script examples:

To call the entry script, set the following two attributes in your parallel job definition:

Attribute name Type Description
code string Local path to the source code directory to upload and use for the job.
entry_script string The Python file that contains the implementation of predefined parallel functions.

Parallel job step example

The following code declares the job_data_path as input, binds it to the input_data attribute, sets the mini_batch_size data division attribute, and calls the entry script.

# parallel task to process file data
file_batch_inference = parallel_run_function(
    name="file_batch_score",
    display_name="Batch Score with File Dataset",
    description="parallel component for batch score",
    inputs=dict(
        job_data_path=Input(
            type=AssetTypes.MLTABLE,
            description="The data to be split and scored in parallel",
        )
    ),
    outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
    input_data="${{inputs.job_data_path}}",
    instance_count=2,
    max_concurrency_per_instance=1,
    mini_batch_size="1",
    mini_batch_error_threshold=1,
    retry_settings=dict(max_retries=2, timeout=60),
    logging_level="DEBUG",
    task=RunFunction(
        code="./src",
        entry_script="file_batch_inference.py",
        program_arguments="--job_output_path ${{outputs.job_output_path}}",
        environment="azureml://registries/azureml/environments/sklearn-1.5/labels/latest",
    ),
)

Consider automation settings

Azure Machine Learning parallel job exposes many optional settings that can automatically control the job without manual intervention. The following table describes these settings.

Key Type Description Allowed values Default value Set in attribute or program argument
mini_batch_error_threshold integer Number of failed mini-batches to ignore in this parallel job. If the count of failed mini-batches is higher than this threshold, the parallel job is marked as failed.

The mini-batch is marked as failed if:
- The count of return from run() is less than the mini-batch input count.
- Exceptions are caught in custom run() code.
[-1, int.max] -1, meaning ignore all failed mini-batches Attribute mini_batch_error_threshold
mini_batch_max_retries integer Number of retries when the mini-batch fails or times out. If all retries fail, the mini-batch is marked as failed per the mini_batch_error_threshold calculation. [0, int.max] 2 Attribute retry_settings.max_retries
mini_batch_timeout integer Timeout in seconds for executing the custom run() function. If execution time is higher than this threshold, the mini-batch is aborted and marked as failed to trigger retry. (0, 259200] 60 Attribute retry_settings.timeout
item_error_threshold integer The threshold of failed items. Failed items are counted by the number gap between inputs and returns from each mini-batch. If the sum of failed items is higher than this threshold, the parallel job is marked as failed. [-1, int.max] -1, meaning ignore all failures during parallel job Program argument
--error_threshold
allowed_failed_percent integer Similar to mini_batch_error_threshold, but uses the percent of failed mini-batches instead of the count. [0, 100] 100 Program argument
--allowed_failed_percent
overhead_timeout integer Timeout in seconds for initialization of each mini-batch. For example, load mini-batch data and pass it to the run() function. (0, 259200] 600 Program argument
--task_overhead_timeout
progress_update_timeout integer Timeout in seconds for monitoring the progress of mini-batch execution. If no progress updates are received within this timeout setting, the parallel job is marked as failed. (0, 259200] Dynamically calculated by other settings Program argument
--progress_update_timeout
first_task_creation_timeout integer Timeout in seconds for monitoring the time between the job start and the run of the first mini-batch. (0, 259200] 600 Program argument
--first_task_creation_timeout
logging_level string The level of logs to dump to user log files. INFO, WARNING, or DEBUG INFO Attribute logging_level
append_row_to string Aggregate all returns from each run of the mini-batch and output it into this file. May refer to one of the outputs of the parallel job by using the expression ${{outputs.<output_name>}} Attribute task.append_row_to
copy_logs_to_parent string Boolean option whether to copy the job progress, overview, and logs to the parent pipeline job. True or False False Program argument
--copy_logs_to_parent
resource_monitor_interval integer Time interval in seconds to dump node resource usage (for example cpu or memory) to log folder under the logs/sys/perf path.

Note: Frequent dump resource logs slightly slow execution speed. Set this value to 0 to stop dumping resource usage.
[0, int.max] 600 Program argument
--resource_monitor_interval

The following sample code updates these settings:

# parallel task to process tabular data
tabular_batch_inference = parallel_run_function(
    name="batch_score_with_tabular_input",
    display_name="Batch Score with Tabular Dataset",
    description="parallel component for batch score",
    inputs=dict(
        job_data_path=Input(
            type=AssetTypes.MLTABLE,
            description="The data to be split and scored in parallel",
        ),
        score_model=Input(
            type=AssetTypes.URI_FOLDER, description="The model for batch score."
        ),
    ),
    outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
    input_data="${{inputs.job_data_path}}",
    instance_count=2,
    max_concurrency_per_instance=2,
    mini_batch_size="100",
    mini_batch_error_threshold=5,
    logging_level="DEBUG",
    retry_settings=dict(max_retries=2, timeout=60),
    task=RunFunction(
        code="./src",
        entry_script="tabular_batch_inference.py",
        environment=Environment(
            image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
            conda_file="./src/environment_parallel.yml",
        ),
        program_arguments="--model ${{inputs.score_model}} "
        "--job_output_path ${{outputs.job_output_path}} "
        "--error_threshold 5 "
        "--allowed_failed_percent 30 "
        "--task_overhead_timeout 1200 "
        "--progress_update_timeout 600 "
        "--first_task_creation_timeout 600 "
        "--copy_logs_to_parent True "
        "--resource_monitor_interva 20 ",
        append_row_to="${{outputs.job_output_path}}",
    ),
)

Create the pipeline with parallel job step

First, import the required libraries, initiate the ml_client with proper credentials, and create or retrieve your computes:

# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient, Input, Output, load_component
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import Environment
from azure.ai.ml.constants import AssetTypes, InputOutputModes
from azure.ai.ml.parallel import parallel_run_function, RunFunction
try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

# Retrieve an already attached Azure Machine Learning Compute.
cpu_compute_target = "cpu-cluster"
print(ml_client.compute.get(cpu_compute_target))
gpu_compute_target = "gpu-cluster"
print(ml_client.compute.get(gpu_compute_target))

Then, implement the parallel job by completing the parallel_run_function:

# parallel task to process tabular data
tabular_batch_inference = parallel_run_function(
    name="batch_score_with_tabular_input",
    display_name="Batch Score with Tabular Dataset",
    description="parallel component for batch score",
    inputs=dict(
        job_data_path=Input(
            type=AssetTypes.MLTABLE,
            description="The data to be split and scored in parallel",
        ),
        score_model=Input(
            type=AssetTypes.URI_FOLDER, description="The model for batch score."
        ),
    ),
    outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
    input_data="${{inputs.job_data_path}}",
    instance_count=2,
    max_concurrency_per_instance=2,
    mini_batch_size="100",
    mini_batch_error_threshold=5,
    logging_level="DEBUG",
    retry_settings=dict(max_retries=2, timeout=60),
    task=RunFunction(
        code="./src",
        entry_script="tabular_batch_inference.py",
        environment=Environment(
            image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
            conda_file="./src/environment_parallel.yml",
        ),
        program_arguments="--model ${{inputs.score_model}} "
        "--job_output_path ${{outputs.job_output_path}} "
        "--error_threshold 5 "
        "--allowed_failed_percent 30 "
        "--task_overhead_timeout 1200 "
        "--progress_update_timeout 600 "
        "--first_task_creation_timeout 600 "
        "--copy_logs_to_parent True "
        "--resource_monitor_interva 20 ",
        append_row_to="${{outputs.job_output_path}}",
    ),
)

Finally, use your parallel job as a step in your pipeline and bind its inputs/outputs with other steps:

@pipeline()
def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):

    prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
    # output of file & tabular data should be type MLTable
    prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
    prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE

    batch_inference_with_file_data = file_batch_inference(
        job_data_path=prepare_file_tabular_data.outputs.file_output_data
    )
    # use eval_mount mode to handle file data
    batch_inference_with_file_data.inputs.job_data_path.mode = (
        InputOutputModes.EVAL_MOUNT
    )
    batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE

    batch_inference_with_tabular_data = tabular_batch_inference(
        job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
        score_model=pipeline_score_model,
    )
    # use direct mode to handle tabular data
    batch_inference_with_tabular_data.inputs.job_data_path.mode = (
        InputOutputModes.DIRECT
    )

    return {
        "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
        "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
    }


pipeline_job_data_path = Input(
    path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
)
pipeline_score_model = Input(
    path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
)
# create a pipeline
pipeline_job = parallel_in_pipeline(
    pipeline_job_data_path=pipeline_job_data_path,
    pipeline_score_model=pipeline_score_model,
)
pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE

# set pipeline level compute
pipeline_job.settings.default_compute = "cpu-cluster"

Submit the pipeline job

Submit your pipeline job with parallel step by using the jobs.create_or_update function of ml_client:

pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="pipeline_samples"
)
pipeline_job

Check parallel step in studio UI

After you submit a pipeline job, the SDK or CLI widget gives you a web URL link to the pipeline graph in the Azure Machine Learning studio UI.

To view parallel job results, double-click the parallel step in the pipeline graph, select the Settings tab in the details panel, expand Run settings, and then expand the Parallel section.

Screenshot of Azure Machine Learning studio showing the parallel job settings.

To debug parallel job failure, select the Outputs + logs tab, expand the logs folder, and check job_result.txt to understand why the parallel job failed. For information about the logging structure of parallel jobs, see readme.txt in the same folder.

Screenshot of Azure Machine Learning studio on the jobs tab showing the parallel job results.