This article explains how to use the CLI v2 and Python SDK v2 to run parallel jobs in Azure Machine Learning pipelines. Parallel jobs accelerate job execution by distributing repeated tasks on powerful multinode compute clusters.
Machine learning engineers always have scale requirements on their training or inferencing tasks. For example, when a data scientist provides a single script to train a sales prediction model, machine learning engineers need to apply this training task to each individual data store. Challenges of this scale-out process include long execution times that cause delays, and unexpected issues that require manual intervention to keep the task running.
The core job of Azure Machine Learning parallelization is to split a single serial task into mini-batches and dispatch those mini-batches to multiple computes to execute in parallel. Parallel jobs significantly reduce end-to-end execution time and also handle errors automatically. Consider using Azure Machine Learning Parallel job to train many models on top of your partitioned data or to accelerate your large-scale batch inferencing tasks.
For example, in a scenario where you're running an object detection model on a large set of images, Azure Machine Learning parallel jobs let you easily distribute your images to run custom code in parallel on a specific compute cluster. Parallelization can significantly reduce time cost. Azure Machine Learning parallel jobs can also simplify and automate your process to make it more efficient.
Prerequisites
Have an Azure Machine Learning account and workspace.
Install the Azure CLI and the ml extension. For more information, see Install, set up, and use the CLI (v2). The ml extension automatically installs the first time you run an az ml command.
This parallel job step requires preparation. You need an entry script that implements the predefined functions. You also need to set attributes in your parallel job definition that:
Define and bind your input data.
Set the data division method.
Configure your compute resources.
Call the entry script.
The following sections describe how to prepare the parallel job.
Declare the inputs and data division setting
A parallel job requires one major input to be split and processed in parallel. The major input data format can be either tabular data or a list of files.
Different data formats have different input types, input modes, and data division methods. The following table describes the options:
Data format
Input type
Input mode
Data division method
File list
mltable or uri_folder
ro_mount or download
By size (number of files) or by partition
Tabular data
mltable
direct
By size (estimated physical size) or by partition
Note
If you use tabular mltable as your major input data, you need to:
Install the mltable library in your environment, as in line 9 of this conda file.
Have a MLTable specification file under your specified path with the transformations: - read_delimited: section filled out. For examples, see Create and manage data assets.
You can declare your major input data with the input_data attribute in the parallel job YAML or Python, and bind the data with the defined input of your parallel job by using ${{inputs.<input name>}}. Then you define the data division attribute for your major input depending on your data division method.
Configure the compute resources for parallelization
After you define the data division attribute, configure the compute resources for your parallelization by setting the instance_count and max_concurrency_per_instance attributes.
Attribute name
Type
Description
Default value
instance_count
integer
The number of nodes to use for the job.
1
max_concurrency_per_instance
integer
The number of processors on each node.
For a GPU compute: 1. For a CPU compute: number of cores.
These attributes work together with your specified compute cluster, as shown in the following diagram:
Call the entry script
The entry script is a single Python file that implements the following three predefined functions with custom code.
Function name
Required
Description
Input
Return
Init()
Y
Common preparation before starting to run mini-batches. For example, use this function to load the model into a global object.
--
--
Run(mini_batch)
Y
Implements main execution logic for mini-batches.
mini_batch is pandas dataframe if input data is a tabular data, or file path list if input data is a directory.
Dataframe, list, or tuple.
Shutdown()
N
Optional function to do custom cleanups before returning the compute to the pool.
--
--
Important
To avoid exceptions when parsing arguments in Init() or Run(mini_batch) functions, use parse_known_args instead of parse_args. See the iris_score example for an entry script with argument parser.
Important
The Run(mini_batch) function requires a return of either a dataframe, list, or tuple item. The parallel job uses the count of that return to measure the success items under that mini-batch. Mini-batch count should be equal to the return list count if all items have processed.
The parallel job executes the functions in each processor, as shown in the following diagram.
The following parallel job step declares the input type, mode, and data division method, binds the input, configures the compute, and calls the entry script.
The following code declares the job_data_path as input, binds it to the input_data attribute, sets the mini_batch_size data division attribute, and calls the entry script.
# parallel task to process file data
file_batch_inference = parallel_run_function(
name="file_batch_score",
display_name="Batch Score with File Dataset",
description="parallel component for batch score",
inputs=dict(
job_data_path=Input(
type=AssetTypes.MLTABLE,
description="The data to be split and scored in parallel",
)
),
outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
input_data="${{inputs.job_data_path}}",
instance_count=2,
max_concurrency_per_instance=1,
mini_batch_size="1",
mini_batch_error_threshold=1,
retry_settings=dict(max_retries=2, timeout=60),
logging_level="DEBUG",
task=RunFunction(
code="./src",
entry_script="file_batch_inference.py",
program_arguments="--job_output_path ${{outputs.job_output_path}}",
environment="azureml://registries/azureml/environments/sklearn-1.5/labels/latest",
),
)
Consider automation settings
Azure Machine Learning parallel job exposes many optional settings that can automatically control the job without manual intervention. The following table describes these settings.
Key
Type
Description
Allowed values
Default value
Set in attribute or program argument
mini_batch_error_threshold
integer
Number of failed mini-batches to ignore in this parallel job. If the count of failed mini-batches is higher than this threshold, the parallel job is marked as failed.
The mini-batch is marked as failed if: - The count of return from run() is less than the mini-batch input count. - Exceptions are caught in custom run() code.
[-1, int.max]
-1, meaning ignore all failed mini-batches
Attribute mini_batch_error_threshold
mini_batch_max_retries
integer
Number of retries when the mini-batch fails or times out. If all retries fail, the mini-batch is marked as failed per the mini_batch_error_threshold calculation.
[0, int.max]
2
Attribute retry_settings.max_retries
mini_batch_timeout
integer
Timeout in seconds for executing the custom run() function. If execution time is higher than this threshold, the mini-batch is aborted and marked as failed to trigger retry.
(0, 259200]
60
Attribute retry_settings.timeout
item_error_threshold
integer
The threshold of failed items. Failed items are counted by the number gap between inputs and returns from each mini-batch. If the sum of failed items is higher than this threshold, the parallel job is marked as failed.
[-1, int.max]
-1, meaning ignore all failures during parallel job
Program argument --error_threshold
allowed_failed_percent
integer
Similar to mini_batch_error_threshold, but uses the percent of failed mini-batches instead of the count.
[0, 100]
100
Program argument --allowed_failed_percent
overhead_timeout
integer
Timeout in seconds for initialization of each mini-batch. For example, load mini-batch data and pass it to the run() function.
(0, 259200]
600
Program argument --task_overhead_timeout
progress_update_timeout
integer
Timeout in seconds for monitoring the progress of mini-batch execution. If no progress updates are received within this timeout setting, the parallel job is marked as failed.
(0, 259200]
Dynamically calculated by other settings
Program argument --progress_update_timeout
first_task_creation_timeout
integer
Timeout in seconds for monitoring the time between the job start and the run of the first mini-batch.
(0, 259200]
600
Program argument --first_task_creation_timeout
logging_level
string
The level of logs to dump to user log files.
INFO, WARNING, or DEBUG
INFO
Attribute logging_level
append_row_to
string
Aggregate all returns from each run of the mini-batch and output it into this file. May refer to one of the outputs of the parallel job by using the expression ${{outputs.<output_name>}}
Attribute task.append_row_to
copy_logs_to_parent
string
Boolean option whether to copy the job progress, overview, and logs to the parent pipeline job.
True or False
False
Program argument --copy_logs_to_parent
resource_monitor_interval
integer
Time interval in seconds to dump node resource usage (for example cpu or memory) to log folder under the logs/sys/perf path.
Note: Frequent dump resource logs slightly slow execution speed. Set this value to 0 to stop dumping resource usage.
First, import the required libraries, initiate the ml_client with proper credentials, and create or retrieve your computes:
# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient, Input, Output, load_component
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import Environment
from azure.ai.ml.constants import AssetTypes, InputOutputModes
from azure.ai.ml.parallel import parallel_run_function, RunFunction
try:
credential = DefaultAzureCredential()
# Check if given credential can get token successfully.
credential.get_token("https://management.azure.com/.default")
except Exception as ex:
# Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
credential = InteractiveBrowserCredential()
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)
# Retrieve an already attached Azure Machine Learning Compute.
cpu_compute_target = "cpu-cluster"
print(ml_client.compute.get(cpu_compute_target))
gpu_compute_target = "gpu-cluster"
print(ml_client.compute.get(gpu_compute_target))
Then, implement the parallel job by completing the parallel_run_function:
# parallel task to process tabular data
tabular_batch_inference = parallel_run_function(
name="batch_score_with_tabular_input",
display_name="Batch Score with Tabular Dataset",
description="parallel component for batch score",
inputs=dict(
job_data_path=Input(
type=AssetTypes.MLTABLE,
description="The data to be split and scored in parallel",
),
score_model=Input(
type=AssetTypes.URI_FOLDER, description="The model for batch score."
),
),
outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
input_data="${{inputs.job_data_path}}",
instance_count=2,
max_concurrency_per_instance=2,
mini_batch_size="100",
mini_batch_error_threshold=5,
logging_level="DEBUG",
retry_settings=dict(max_retries=2, timeout=60),
task=RunFunction(
code="./src",
entry_script="tabular_batch_inference.py",
environment=Environment(
image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
conda_file="./src/environment_parallel.yml",
),
program_arguments="--model ${{inputs.score_model}} "
"--job_output_path ${{outputs.job_output_path}} "
"--error_threshold 5 "
"--allowed_failed_percent 30 "
"--task_overhead_timeout 1200 "
"--progress_update_timeout 600 "
"--first_task_creation_timeout 600 "
"--copy_logs_to_parent True "
"--resource_monitor_interva 20 ",
append_row_to="${{outputs.job_output_path}}",
),
)
Finally, use your parallel job as a step in your pipeline and bind its inputs/outputs with other steps:
@pipeline()
def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
# output of file & tabular data should be type MLTable
prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
batch_inference_with_file_data = file_batch_inference(
job_data_path=prepare_file_tabular_data.outputs.file_output_data
)
# use eval_mount mode to handle file data
batch_inference_with_file_data.inputs.job_data_path.mode = (
InputOutputModes.EVAL_MOUNT
)
batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
batch_inference_with_tabular_data = tabular_batch_inference(
job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
score_model=pipeline_score_model,
)
# use direct mode to handle tabular data
batch_inference_with_tabular_data.inputs.job_data_path.mode = (
InputOutputModes.DIRECT
)
return {
"pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
"pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
}
pipeline_job_data_path = Input(
path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
)
pipeline_score_model = Input(
path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
)
# create a pipeline
pipeline_job = parallel_in_pipeline(
pipeline_job_data_path=pipeline_job_data_path,
pipeline_score_model=pipeline_score_model,
)
pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
# set pipeline level compute
pipeline_job.settings.default_compute = "cpu-cluster"
After you submit a pipeline job, the SDK or CLI widget gives you a web URL link to the pipeline graph in the Azure Machine Learning studio UI.
To view parallel job results, double-click the parallel step in the pipeline graph, select the Settings tab in the details panel, expand Run settings, and then expand the Parallel section.
To debug parallel job failure, select the Outputs + logs tab, expand the logs folder, and check job_result.txt to understand why the parallel job failed. For information about the logging structure of parallel jobs, see readme.txt in the same folder.
Manage data ingestion and preparation, model training and deployment, and machine learning solution monitoring with Python, Azure Machine Learning and MLflow.