ParallelRunConfig Class
Defines configuration for a ParallelRunStep object.
For an example of using ParallelRunStep, see the notebook https://aka.ms/batch-inference-notebooks.
For troubleshooting guide, see https://aka.ms/prstsg. You can find more references there.
Initialize the config object.
- Inheritance
-
azureml.pipeline.core._parallel_run_config_base._ParallelRunConfigBaseParallelRunConfig
Constructor
ParallelRunConfig(environment, entry_script, error_threshold, output_action, compute_target, node_count, process_count_per_node=None, mini_batch_size=None, source_directory=None, description=None, logging_level='INFO', run_invocation_timeout=60, run_max_try=3, append_row_file_name=None, allowed_failed_count=None, allowed_failed_percent=None, partition_keys=None, environment_variables=None)
Parameters
- environment
- Environment
The environment definition that configures the Python environment. It can be configured to use an existing Python environment or to set up a temp environment for the experiment. The environment definition is responsible for defining the required application dependencies, such as conda or pip packages.
- entry_script
- str
User script which will be run in parallel on multiple nodes. This is
specified as a local file path. If source_directory
is specified, then entry_script
is
a relative path inside the directory. Otherwise, it can be any path accessible on the machine.
The entry_script should contain two functions:
init()
: this function should be used for any costly or common preparation for subsequent inferences,
e.g., deserializing and loading the model into a global object.
run(mini_batch)
: The method to be parallelized. Each invocation will have one mini-batch.
'mini_batch': Batch inference will invoke run method and pass either a list or a Pandas DataFrame as an
argument to the method. Each entry in min_batch will be a filepath if input is a FileDataset,
a Pandas DataFrame if input is a TabularDataset.
run() method should return a Pandas DataFrame or an array.
For append_row output_action, these returned elements are appended into the common output file.
For summary_only, the contents of the elements are ignored. For all output actions,
each returned output element indicates one successful inference of input element in the input mini-batch.
Each parallel worker process will call init once and then loop over run function until all mini-batches
are processed.
- error_threshold
- int
The number of record failures for TabularDataset and file failures for FileDataset that should be ignored during processing. If the error count goes above this value, then the job will be aborted. Error threshold is for the entire input and not for individual mini-batches sent to run() method. The range is [-1, int.max]. -1 indicates ignore all failures during processing.
- output_action
- str
How the output should be organized. Current supported values are 'append_row' and 'summary_only'.
- 'append_row' – All values output by run() method invocations will be aggregated into one unique file named parallel_run_step.txt which is created in the output location.
- 'summary_only' – User script is expected to store the output itself. An output row is still expected for each successful input item processed. The system uses this output only for error threshold calculation (ignoring the actual value of the row).
- compute_target
- AmlCompute or str
Compute target to use for ParallelRunStep execution. This parameter may be specified as a compute target object or the name of a compute target in the workspace.
- process_count_per_node
- int
The number of worker processes per node to run the entry script in parallel.
For GPU machine, the default value is 1.
For CPU machine, the default value is the number of cores.
A worker process will call run()
repeatedly by passing the mini batch it gets.
The total number of worker processes in your job is process_count_per_node * node_count
,
which decides the max number of run()
to execute in parallel.
For FileDataset input, this field is the number of files a user script can process in one run() call. For TabularDataset input, this field is the approximate size of data the user script can process in one run() call. Example values are 1024, 1024KB, 10MB, and 1GB. (optional, default value is 10 files for FileDataset and 1MB for TabularDataset.)
- source_directory
- str
Path to folder that contains the entry_script
and supporting files used
to execute on compute target.
- description
- str
A description to give the batch service used for display purposes.
- logging_level
- str
A string of the logging level name, which is defined in 'logging'. Possible values are 'WARNING', 'INFO', and 'DEBUG'. (optional, default value is 'INFO'.)
- run_invocation_timeout
- int
Timeout in seconds for each invocation of the run() method. (optional, default value is 60.)
- run_max_try
- int
The number of maximum tries for a failed or timeout mini batch. The range is [1, int.max]. The default value is 3. A mini batch with dequeue count greater than this won't be processed again and will be deleted directly.
- append_row_file_name
- str
The name of the output file if the output_action
is 'append_row'.
(optional, default value is 'parallel_run_step.txt')
- allowed_failed_count
- int
The number of failed mini batches that should be ignored during processing. If the failed count goes above this value, the job will be aborted. This threshold is for the entire input rather than the individual mini-batch sent to run() method. The range is [-1, int.max]. -1 indicates ignore all failures during processing. A mini batch may fail on the first time it's processed and then succeed on the second try. Checking between the first and second time will count it as failed. Checking after the second time won't count it as failed. The argument –error_threshold, –allowed_failed_count and –allowed_failed_percent can work together. If more than one specified, the job will be aborted if it exceeds any of them.
- allowed_failed_percent
- float
The percent of failed mini batches that should be ignored during processing. If the failed percent goes above this value, then the job will be aborted. This threshold is for the entire input rather than the individual mini-batch sent to run() method. The range is [0, 100]. 100 or 100.0 indicates ignore all failures during processing. The check starts after all mini batches have been scheduled. The argument –error_threshold, –allowed_failed_count and –allowed_failed_percent can work together. If more than one specified, the job will be aborted if it exceeds any of them.
- partition_keys
- <xref:List>[str]
The keys used to partition dataset into mini-batches. If specified, the data with the same key will be partitioned into the same mini-batch. If both partition_keys and mini_batch_size are specified, error would be raised. It should be a list of str elements each being a key used to partition the input dataset. However, if promoted to PipelineParameter, the default values should be the json dump str of the list because list type is not supported in PipelineParameter for now. The input(s) must be partitioned dataset(s), and the partition_keys must be a subset of the keys of every input dataset for this to work.
A dictionary of environment variables names and values. These environment variables are set on the process where user script is being executed.
- environment
- Environment
The environment definition that configures the Python environment. It can be configured to use an existing Python environment or to set up a temp environment for the experiment. The environment definition is responsible for defining the required application dependencies, such as conda or pip packages.
- entry_script
- str
User script which will be run in parallel on multiple nodes. This is
specified as a local file path. If source_directory
is specified, then entry_script
is
a relative path inside the directory. Otherwise, it can be any path accessible on the machine.
The entry_script should contain two functions:
init()
: this function should be used for any costly or common preparation for subsequent inferences,
e.g., deserializing and loading the model into a global object.
run(mini_batch)
: The method to be parallelized. Each invocation will have one mini-batch.
'mini_batch': Batch inference will invoke run method and pass either a list or a Pandas DataFrame as an
argument to the method. Each entry in min_batch will be a filepath if input is a FileDataset,
a Pandas DataFrame if input is a TabularDataset.
run() method should return a Pandas DataFrame or an array.
For append_row output_action, these returned elements are appended into the common output file.
For summary_only, the contents of the elements are ignored. For all output actions,
each returned output element indicates one successful inference of input element in the
input mini-batch.
Each parallel worker process will call init once and then loop over run function until all mini-batches
are processed.
- error_threshold
- int
The number of record failures for TabularDataset and file failures for FileDataset that should be ignored during processing. If the error count goes above this value, then the job will be aborted. Error threshold is for the entire input and not for individual mini-batches sent to run() method. The range is [-1, int.max]. -1 indicates ignore all failures during processing.
- output_action
- str
How the output should be organized. Current supported values are 'append_row' and 'summary_only'.
- 'append_row' – All values output by run() method invocations will be aggregated into one unique file named parallel_run_step.txt which is created in the output location.
- 'summary_only' – User script is expected to store the output itself. An output row is still expected for each successful input item processed. The system uses this output only for error threshold calculation (ignoring the actual value of the row).
- compute_target
- AmlCompute or str
Compute target to use for ParallelRunStep execution. This parameter may be specified as a compute target object or the name of a compute target in the workspace.
- process_count_per_node
- int
The number of worker processes per node to run the entry script in parallel.
For a GPU machine, the default value is 1.
For a CPU machine, the default value is the number of cores.
A worker process will call run()
repeatedly by passing the mini batch it gets.
The total number of worker processes in your job is process_count_per_node * node_count
,
which decides the max number of run()
to execute in parallel.
For FileDataset input, this field is the number of files a user script can process in one run() call. For TabularDataset input, this field is the approximate size of data the user script can process in one run() call. Example values are 1024, 1024KB, 10MB, and 1GB. (optional, default value is 10 files for FileDataset and 1MB for TabularDataset.)
- source_directory
- str
Path to folder that contains the entry_script
and supporting files used
to execute on compute target.
- logging_level
- str
A string of the logging level name, which is defined in 'logging'. Possible values are 'WARNING', 'INFO', and 'DEBUG'. (optional, default value is 'INFO'.)
- run_invocation_timeout
- int
Timeout in seconds for each invocation of the run() method. (optional, default value is 60.)
- run_max_try
- int
The number of maximum tries for a failed or timeout mini batch. The range is [1, int.max]. The default value is 3. A mini batch with dequeue count greater than this won't be processed again and will be deleted directly.
- append_row_file_name
- str
The name of the output file if the output_action
is 'append_row'.
(optional, default value is 'parallel_run_step.txt')
- allowed_failed_count
- int
The number of failed mini batches that should be ignored during processing. If the failed count goes above this value, the job will be aborted. This threshold is for the entire input rather than the individual mini-batch sent to run() method. The range is [-1, int.max]. -1 indicates ignore all failures during processing. A mini batch may fail on the first time it's processed and then succeed on the second try. Checking between the first and second time will count it as failed. Checking after the second time won't count it as failed. The argument –error_threshold, –allowed_failed_count and –allowed_failed_percent can work together. If more than one specified, the job will be aborted if it exceeds any of them.
- allowed_failed_percent
- float
The percent of failed mini batches that should be ignored during processing. If the failed percent goes above this value, then the job will be aborted. This threshold is for the entire input rather than the individual mini-batch sent to run() method. The range is [0, 100]. 100 or 100.0 indicates ignore all failures during processing. The check starts after all mini batches have been scheduled. The argument –error_threshold, –allowed_failed_count and –allowed_failed_percent can work together. If more than one specified, the job will be aborted if it exceeds any of them.
- partition_keys
- <xref:List>[str]
The keys used to partition dataset into mini-batches. If specified, the data with the same key will be partitioned into the same mini-batch. If both partition_keys and mini_batch_size are specified, error would be raised. It should be a list of str elements each being a key used to partition the input dataset. However, if promoted to PipelineParameter, the default values should be the json dump str of the list because list type is not supported in PipelineParameter for now. The input(s) must be partitioned dataset(s), and the partition_keys must be a subset of the keys of every input dataset for this to work.
A dictionary of environment variables names and values. These environment variables are set on the process where user script is being executed.
Remarks
The ParallelRunConfig class is used to provide configuration for the ParallelRunStep class. ParallelRunConfig and ParallelRunStep can be used together for processing large amounts of data in parallel. Common use cases are training an ML model or running offline inference to generate predictions on a batch of observations. ParallelRunStep works by breaking up your data into batches that are processed in parallel. The batch size, node count, and other tunable parameters to speed up your parallel processing can be controlled with the ParallelRunConfig class. ParallelRunStep can work with either TabularDataset or FileDataset as input.
To use ParallelRunStep and ParallelRunConfig:
Create a ParallelRunConfig object to specify how batch processing is performed, with parameters to control batch size, number of nodes per compute target, and a reference to your custom Python script.
Create a ParallelRunStep object that uses the ParallelRunConfig object, defines inputs and outputs for the step.
Use the configured ParallelRunStep object in a Pipeline just as you would with other pipeline step types.
Examples of working with ParallelRunStep and ParallelRunConfig classes for batch inference are discussed in the following articles:
Tutorial: Build an Azure Machine Learning pipeline for batch scoring. This article shows how to use these two classes for asynchronous batch scoring in a pipeline and enable a REST endpoint to run the pipeline.
Run batch inference on large amounts of data by using Azure Machine Learning. This article shows how to process large amounts of data asynchronously and in parallel with a custom inference script and a pre-trained image classification model bases on the MNIST dataset.
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
parallel_run_config = ParallelRunConfig(
source_directory=scripts_folder,
entry_script=script_file,
mini_batch_size="5", # or partition_keys=["key1", "key2"], which is another way to partition the
# input to mini-batches, refer to the parameter description for details
error_threshold=10, # Optional, allowed failed count on mini batch items
allowed_failed_count=15, # Optional, allowed failed count on mini batches
allowed_failed_percent=10, # Optional, allowed failed percent on mini batches
run_max_try=3,
output_action="append_row",
environment=batch_env,
compute_target=compute_target,
node_count=2)
parallelrun_step = ParallelRunStep(
name="predict-digits-mnist",
parallel_run_config=parallel_run_config,
inputs=[ named_mnist_ds ],
output=output_dir,
arguments=[ "--extra_arg", "example_value" ],
allow_reuse=True
)
For more information about this example, see the notebook https://aka.ms/batch-inference-notebooks.
Methods
load_yaml |
Load parallel run configuration data from a YAML file. |
save_to_yaml |
Export parallel run configuration data to a YAML file. |
load_yaml
Load parallel run configuration data from a YAML file.
static load_yaml(workspace, path)
Parameters
save_to_yaml
Export parallel run configuration data to a YAML file.
save_to_yaml(path)
Parameters
Feedback
Submit and view feedback for