How to use parallel job in pipeline (V2)

APPLIES TO: Azure CLI ml extension v2 (current) Python SDK azure-ai-ml v2 (current)

Parallel job lets users accelerate their job execution by distributing repeated tasks on powerful multi-nodes compute clusters. For example, take the scenario where you're running an object detection model on large set of images. With Azure ML Parallel job, you can easily distribute your images to run custom code in parallel on a specific compute cluster. Parallelization could significantly reduce the time cost. Also by using Azure ML parallel job you can simplify and automate your process to make it more efficient.

Prerequisite

Azure ML parallel job can only be used as one of steps in a pipeline job. Thus, it's important to be familiar with using pipelines. To learn more about Azure ML pipelines, see the following articles.

Why are parallel jobs needed?

In the real world, ML engineers always have scale requirements on their training or inferencing tasks. For example, when a data scientist provides a single script to train a sales prediction model, ML engineers need to apply this training task to each individual store. During this scale out process, some challenges are:

  • Delay pressure caused by long execution time.
  • Manual intervention to handle unexpected issues to keep the task proceeding.

The core value of Azure ML parallel job is to split a single serial task into mini-batches and dispatch those mini-batches to multiple computes to execute in parallel. By using parallel jobs, we can:

  • Significantly reduce end-to-end execution time.
  • Use Azure ML parallel job's automatic error handling settings.

You should consider using Azure ML Parallel job if:

  • You plan to train many models on top of your partitioned data.
  • You want to accelerate your large scale batch inferencing task.

Prepare for parallel job

Unlike other types of jobs, a parallel job requires preparation. Follow the next sections to prepare for creating your parallel job.

Declare the inputs to be distributed and partition setting

Parallel job requires only one major input data to be split and processed with parallel. The major input data can be either tabular data or a set of files. Different input types can have a different partition method.

The following table illustrates the relation between input data and partition setting:

Data format AML input type AML input mode Partition method
File list mltable or
uri_folder
ro_mount or
download
By size (number of files)
Tabular data mltable direct By size (estimated physical size)

You can declare your major input data with input_data attribute in parallel job YAML or Python SDK. And you can bind it with one of your defined inputs of your parallel job by using ${{inputs.<input name>}}. Then to define the partition method for your major input.

For example, you could set numbers to mini_batch_size to partition your data by size.

  • When using file list input, this value defines the number of files for each mini-batch.
  • When using tabular input, this value defines the estimated physical size for each mini-batch.

APPLIES TO: Azure CLI ml extension v2 (current)

batch_prediction:
  type: parallel
  compute: azureml:cpu-cluster
  inputs:
    input_data: 
      type: mltable
      path: ./neural-iris-mltable
      mode: direct
    score_model: 
      type: uri_folder
      path: ./iris-model
      mode: download
  outputs:
    job_output_file:
      type: uri_file
      mode: rw_mount

  input_data: ${{inputs.input_data}}
  mini_batch_size: "10kb"
  resources:
      instance_count: 2
  max_concurrency_per_instance: 2

  logging_level: "DEBUG"
  mini_batch_error_threshold: 5
  retry_settings:
    max_retries: 2
    timeout: 60

Once you have the partition setting defined, you can configure parallel setting by using two attributes below:

Attribute name Type Description Default value
instance_count integer The number of nodes to use for the job. 1
max_concurrency_per_instance integer The number of processors on each node. For a GPU compute, the default value is 1.
For a CPU compute, the default value is the number of cores.

These two attributes work together with your specified compute cluster.

Diagram showing how distributed data works in parallel job.

Sample code to set two attributes:

APPLIES TO: Azure CLI ml extension v2 (current)

batch_prediction:
  type: parallel
  compute: azureml:cpu-cluster
  inputs:
    input_data: 
      type: mltable
      path: ./neural-iris-mltable
      mode: direct
    score_model: 
      type: uri_folder
      path: ./iris-model
      mode: download
  outputs:
    job_output_file:
      type: uri_file
      mode: rw_mount

  input_data: ${{inputs.input_data}}
  mini_batch_size: "10kb"
  resources:
      instance_count: 2
  max_concurrency_per_instance: 2

  logging_level: "DEBUG"
  mini_batch_error_threshold: 5
  retry_settings:
    max_retries: 2
    timeout: 60

Note

If you use tabular mltable as your major input data, you need to have the MLTABLE specification file with transformations - read_delimited section filled under your specific path. For more examples, see Create a mltable data asset

Implement predefined functions in entry script

Entry script is a single Python file where user needs to implement three predefined functions with custom code. Azure ML parallel job follows the diagram below to execute them in each processor.

Diagram showing how entry script works in parallel job.

Function name Required Description Input Return
Init() Y Use this function for common preparation before starting to run mini-batches. For example, use it to load the model into a global object. -- --
Run(mini_batch) Y Implement main execution logic for mini_batches. mini_batch:
Pandas dataframe if input data is a tabular data.
List of file path if input data is a directory.
Dataframe, List, or Tuple.
Shutdown() N Optional function to do custom cleans up before returning the compute back to pool. -- --

Check the following entry script examples to get more details:

Once you have entry script ready, you can set following two attributes to use it in your parallel job:

Attribute name Type Description Default value
code string Local path to the source code directory to be uploaded and used for the job.
entry_script string The Python file that contains the implementation of pre-defined parallel functions.

Sample code to set two attributes:

APPLIES TO: Azure CLI ml extension v2 (current)

batch_prediction:
  type: parallel
  compute: azureml:cpu-cluster
  inputs:
    input_data: 
      type: mltable
      path: ./neural-iris-mltable
      mode: direct
    score_model: 
      type: uri_folder
      path: ./iris-model
      mode: download
  outputs:
    job_output_file:
      type: uri_file
      mode: rw_mount

  input_data: ${{inputs.input_data}}
  mini_batch_size: "10kb"
  resources:
      instance_count: 2
  max_concurrency_per_instance: 2

  logging_level: "DEBUG"
  mini_batch_error_threshold: 5
  retry_settings:
    max_retries: 2
    timeout: 60

  task:
    type: run_function
    code: "./script"
    entry_script: iris_prediction.py
    environment:
      name: "prs-env"
      version: 1
      image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
      conda_file: ./environment/environment_parallel.yml
    program_arguments: >-
      --model ${{inputs.score_model}}
      --error_threshold 5
      --allowed_failed_percent 30
      --task_overhead_timeout 1200
      --progress_update_timeout 600
      --first_task_creation_timeout 600
      --copy_logs_to_parent True
      --resource_monitor_interva 20
    append_row_to: ${{outputs.job_output_file}}

Important

Run(mini_batch) function requires a return of either a dataframe, list, or tuple item. Parallel job will use the count of that return to measure the success items under that mini-batch. Ideally mini-batch count should be equal to the return list count if all items have well processed in this mini-batch.

Important

If you want to parse arguments in Init() or Run(mini_batch) function, use "parse_known_args" instead of "parse_args" for avoiding exceptions. See the iris_score example for entry script with argument parser.

Consider automation settings

Azure ML parallel job exposes numerous settings to automatically control the job without manual intervention. See the following table for the details.

Key Type Description Allowed values Default value Set in attribute Set in program arguments
mini batch error threshold integer Define the number of failed mini batches that could be ignored in this parallel job. If the count of failed mini-batch is higher than this threshold, the parallel job will be marked as failed.

Mini-batch is marked as failed if:
- the count of return from run() is less than mini-batch input count.
- catch exceptions in custom run() code.

"-1" is the default number, which means to ignore all failed mini-batch during parallel job.
[-1, int.max] -1 mini_batch_error_threshold N/A
mini batch max retries integer Define the number of retries when mini-batch is failed or timeout. If all retries are failed, the mini-batch will be marked as failed to be counted by mini_batch_error_threshold calculation. [0, int.max] 2 retry_settings.max_retries N/A
mini batch timeout integer Define the timeout in seconds for executing custom run() function. If the execution time is higher than this threshold, the mini-batch will be aborted, and marked as a failed mini-batch to trigger retry. (0, 259200] 60 retry_settings.timeout N/A
item error threshold integer The threshold of failed items. Failed items are counted by the number gap between inputs and returns from each mini-batch. If the sum of failed items is higher than this threshold, the parallel job will be marked as failed.

Note: "-1" is the default number, which means to ignore all failures during parallel job.
[-1, int.max] -1 N/A --error_threshold
allowed failed percent integer Similar to mini_batch_error_threshold but uses the percent of failed mini-batches instead of the count. [0, 100] 100 N/A --allowed_failed_percent
overhead timeout integer The timeout in second for initialization of each mini-batch. For example, load mini-batch data and pass it to run() function. (0, 259200] 600 N/A --task_overhead_timeout
progress update timeout integer The timeout in second for monitoring the progress of mini-batch execution. If no progress updates receive within this timeout setting, the parallel job will be marked as failed. (0, 259200] Dynamically calculated by other settings. N/A --progress_update_timeout
first task creation timeout integer The timeout in second for monitoring the time between the job start to the run of first mini-batch. (0, 259200] 600 N/A --first_task_creation_timeout
logging level string Define which level of logs will be dumped to user log files. INFO, WARNING, or DEBUG INFO logging_level N/A
append row to string Aggregate all returns from each run of mini-batch and output it into this file. May reference to one of the outputs of parallel job by using the expression ${{outputs.<output_name>}} task.append_row_to N/A
copy logs to parent string Boolean option to whether copy the job progress, overview, and logs to the parent pipeline job. True or False False N/A --copy_logs_to_parent
resource monitor interval integer The time interval in seconds to dump node resource usage(for example, cpu, memory) to log folder under "logs/sys/perf" path.

Note: Frequent dump resource logs will slightly slow down the execution speed of your mini-batch. Set this value to "0" to stop dumping resource usage.
[0, int.max] 600 N/A --resource_monitor_interval

Sample code to update these settings:

APPLIES TO: Azure CLI ml extension v2 (current)

batch_prediction:
  type: parallel
  compute: azureml:cpu-cluster
  inputs:
    input_data: 
      type: mltable
      path: ./neural-iris-mltable
      mode: direct
    score_model: 
      type: uri_folder
      path: ./iris-model
      mode: download
  outputs:
    job_output_file:
      type: uri_file
      mode: rw_mount

  input_data: ${{inputs.input_data}}
  mini_batch_size: "10kb"
  resources:
      instance_count: 2
  max_concurrency_per_instance: 2

  logging_level: "DEBUG"
  mini_batch_error_threshold: 5
  retry_settings:
    max_retries: 2
    timeout: 60

  task:
    type: run_function
    code: "./script"
    entry_script: iris_prediction.py
    environment:
      name: "prs-env"
      version: 1
      image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
      conda_file: ./environment/environment_parallel.yml
    program_arguments: >-
      --model ${{inputs.score_model}}
      --error_threshold 5
      --allowed_failed_percent 30
      --task_overhead_timeout 1200
      --progress_update_timeout 600
      --first_task_creation_timeout 600
      --copy_logs_to_parent True
      --resource_monitor_interva 20
    append_row_to: ${{outputs.job_output_file}}

Create parallel job in pipeline

APPLIES TO: Azure CLI ml extension v2 (current)

You can create your parallel job inline with your pipeline job:

$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline

display_name: iris-batch-prediction-using-parallel
description: The hello world pipeline job with inline parallel job
tags:
  tag: tagvalue
  owner: sdkteam

settings:
  default_compute: azureml:cpu-cluster

jobs:
  batch_prediction:
    type: parallel
    compute: azureml:cpu-cluster
    inputs:
      input_data: 
        type: mltable
        path: ./neural-iris-mltable
        mode: direct
      score_model: 
        type: uri_folder
        path: ./iris-model
        mode: download
    outputs:
      job_output_file:
        type: uri_file
        mode: rw_mount

    input_data: ${{inputs.input_data}}
    mini_batch_size: "10kb"
    resources:
        instance_count: 2
    max_concurrency_per_instance: 2

    logging_level: "DEBUG"
    mini_batch_error_threshold: 5
    retry_settings:
      max_retries: 2
      timeout: 60

    task:
      type: run_function
      code: "./script"
      entry_script: iris_prediction.py
      environment:
        name: "prs-env"
        version: 1
        image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
        conda_file: ./environment/environment_parallel.yml
      program_arguments: >-
        --model ${{inputs.score_model}}
        --error_threshold 5
        --allowed_failed_percent 30
        --task_overhead_timeout 1200
        --progress_update_timeout 600
        --first_task_creation_timeout 600
        --copy_logs_to_parent True
        --resource_monitor_interva 20
      append_row_to: ${{outputs.job_output_file}}

Submit pipeline job and check parallel step in Studio UI

APPLIES TO: Azure CLI ml extension v2 (current)

You can submit your pipeline job with parallel step by using the CLI command:

az ml job create --file pipeline.yml

Once you submit your pipeline job, the SDK or CLI widget will give you a web URL link to the Studio UI. The link will guide you to the pipeline graph view by default. Double select the parallel step to open the right panel of your parallel job.

To check the settings of your parallel job, navigate to Parameters tab, expand Run settings, and check Parallel section:

Screenshot of Azure ML studio on the jobs tab showing the parallel job settings.

To debug the failure of your parallel job, navigate to Outputs + Logs tab, expand logs folder from output directories on the left, and check job_result.txt to understand why the parallel job is failed. For more detail about logging structure of parallel job, see the readme.txt under the same folder.

Screenshot of Azure ML studio on the jobs tab showing the parallel job results.

Parallel job in pipeline examples

Next steps