ParallelRunConfig 类

定义 ParallelRunStep 对象的配置。

有关使用 ParallelRunStep 的示例,请参阅笔记本 https://aka.ms/batch-inference-notebooks

有关故障排除指南,请参阅 https://aka.ms/prstsg。 可在此处找到更多参考资料。

初始化 config 对象。

继承
azureml.pipeline.core._parallel_run_config_base._ParallelRunConfigBase
ParallelRunConfig

构造函数

ParallelRunConfig(environment, entry_script, error_threshold, output_action, compute_target, node_count, process_count_per_node=None, mini_batch_size=None, source_directory=None, description=None, logging_level='INFO', run_invocation_timeout=60, run_max_try=3, append_row_file_name=None, allowed_failed_count=None, allowed_failed_percent=None, partition_keys=None, environment_variables=None)

参数

名称 说明
environment
必需

配置 Python 环境的环境定义。 可以将其配置为使用现有的 Python 环境,也可以将其配置为设置临时试验环境。 环境定义负责定义所需的应用程序依赖项,如 conda 或 pip 包。

entry_script
必需
str

将在多个节点上并行运行的用户脚本。 这指定为本地文件路径。 如果指定了 source_directory,则 entry_script 是目录内部的相对路径。 否则,它可以是计算机上可访问的任何路径。 Entry_script 应包含两个函数:init():应将此函数用于后续推理的任何成本高昂或常见准备,例如,反序列化并将其加载到全局对象。 run(mini_batch):要并行化的方法。 每个调用都具有一个小型批处理。 “mini_batch”:批处理推理将调用 run 方法,并将列表或 Pandas 数据帧作为参数传递给该方法。 如果输入是 FileDataset,则 min_batch 中的每个条目都将为 filepath,如果输入为 TabularDataset,则为 Pandas 数据帧。 run() 方法应返回 Pandas 数据帧或数组。 对于 append_row output_action,这些返回的元素将追加到公共输出文件中。 对于 summary_only,将忽略元素的内容。 对于所有的输出操作,每个返回的输出元素都指示输入微型批处理中输入元素的一个成功推理。 每个并行工作进程只调用一次 init,然后循环运行函数,直到处理完所有的小型批处理。

error_threshold
必需
int

在处理过程中应忽略的 TabularDataset 记录失败数和 FileDataset 文件失败数。 如果错误计数高于此值,则作业将中止。 错误阈值适用于整个输入,而非适用于发送给 run() 方法的单个微型批处理。 范围为 [-1, int.max]。 -1 表示忽略处理过程中的所有失败。

output_action
必需
str

如何组织输出。 当前支持的值为“append_row”和“summary_only”。

  1. “append_row”- run() 方法调用输出的所有值将聚合到一个名为 parallel_run_step.txt 的唯一文件中,该文件在输出位置创建。
  2. “summary_only”- 用户脚本应自行存储输出。 对于已处理的每个成功输入项,仍需要一个输出行。 系统仅将此输出用于错误阈值计算(忽略行的实际值)。
compute_target
必需

用于 ParallelRunStep 执行的计算目标。 此参数可以指定为计算目标对象或工作区中计算目标的名称。

node_count
必需
int

计算目标中用于运行 ParallelRunStep 的节点数。

process_count_per_node
int

每个节点并行运行入口脚本的工作进程数。 对于 GPU 计算机,默认值为 1。 对于 CPU 计算机,默认值是核心数。 工作进程会通过传递它获取的微型批来反复调用 run()。 作业中的工作进程总数为 process_count_per_node * node_count,这个数字决定了要并行执行的 run() 的最大数目。

默认值: None
mini_batch_size

对于 FileDataset 输入,此字段是用户脚本在一次 run() 调用中可以处理的文件数。 对于 TabularDataset 输入,此字段是用户脚本可以在一次 run() 调用中处理的数据的近似大小。 示例值为 1024、1024KB、10MB 和1GB。 (可选;对于 FileDataset,默认值为 10 个文件,对于 TabularDataset,默认值为 1MB。)

默认值: None
source_directory
str

指向文件夹的路径,这些文件夹中包含用于在计算目标上执行的 entry_script 和支持文件。

默认值: None
description
str

提供用于显示的 Batch 服务的说明。

默认值: None
logging_level
str

在“日志记录”中定义的日志记录级别名称的字符串。 可能的值为“WARNING”、“INFO”和“DEBUG”。 (可选,默认值为“INFO”。)

默认值: INFO
run_invocation_timeout
int

每次调用 run() 方法的超时时间(以秒为单位)。 (可选,默认值为 60。)

默认值: 60
run_max_try
int

针对失败或已超时微批的最大尝试次数。 范围为 [1,int.max]。 默认值为 3。 不会再次处理其取消排队计数大于此值的微批,将直接将其删除。

默认值: 3
append_row_file_name
str

如果 output_action 为“append_row”,则为输出文件的名称。 (可选,默认值为 "parallel_run_step.txt" )

默认值: None
allowed_failed_count
int

处理过程中应忽略的已失败的小型批处理数。 如果失败计数高于此值,则作业将中止。 此阈值适用于整个输入,而不是发送给运行 () 方法的单个小型批处理。 范围为 [-1, int.max]。 -1 表示忽略处理过程中的所有失败。 小型批处理可能会在第一次处理时失败,并在第二次尝试时成功。 在第一次和第二次之间进行检查会将其计为失败。 第二次检查后,不会将其计为失败。 参数– error_threshold、– allowed_failed_count 和– allowed_failed_percent 可以一起工作。 如果指定了多个,则在该作业超过其中任何一个时,它将中止。

默认值: None
allowed_failed_percent

处理过程中应忽略的已失败的小型批处理百分比。 如果失败百分比高于此值,则作业将中止。 此阈值适用于整个输入,而不是发送给运行 () 方法的单个小型批处理。 范围为 [0,100]。 100 或 100.0 表示忽略处理过程中的所有失败。 计划好所有小型批处理后开始检查。 参数– error_threshold、– allowed_failed_count 和– allowed_failed_percent 可以一起工作。 如果指定了多个,则在该作业超过其中任何一个时,它将中止。

默认值: None
partition_keys

用于将数据集分区为小型批处理的键。 如果指定此参数,则具有相同键的数据将分区到相同的小型批处理中。 如果同时指定 partition_keys 和 mini_batch_size,将引发错误。 它应该是 str 元素列表,每个元素都是用于对输入数据集进行分区的键。 但是,如果升级到 PipelineParameter,则默认值应是列表的 json 转储 str,因为 PipelineParameter 中目前不支持 list 类型。 输入 (s) 必须是 () 的分区数据集,并且 partition_keys 必须是每个输入数据集的键的子集才能运行。

默认值: None
environment_variables

环境变量名称和值的字典。 这些环境变量是在执行用户脚本的进程上设置的。

默认值: None
environment
必需

配置 Python 环境的环境定义。 可以将其配置为使用现有的 Python 环境,也可以将其配置为设置临时试验环境。 环境定义负责定义所需的应用程序依赖项,如 conda 或 pip 包。

entry_script
必需
str

将在多个节点上并行运行的用户脚本。 这指定为本地文件路径。 如果指定了 source_directory,则 entry_script 是目录内部的相对路径。 否则,它可以是计算机上可访问的任何路径。 Entry_script 应包含两个函数:init():应将此函数用于后续推理的任何成本高昂或常见准备,例如,反序列化并将其加载到全局对象。 run(mini_batch):要并行化的方法。 每个调用都具有一个小型批处理。 “mini_batch”:批处理推理将调用 run 方法,并将列表或 Pandas 数据帧作为参数传递给该方法。 如果输入是 FileDataset,则 min_batch 中的每个条目都将为 filepath,如果输入为 TabularDataset,则为 Pandas 数据帧。 run() 方法应返回 Pandas 数据帧或数组。 对于 append_row output_action,这些返回的元素将追加到公共输出文件中。 对于 summary_only,将忽略元素的内容。 对于所有的输出操作,每个返回的输出元素都指示输入微型批处理中输入元素的一个成功推理。 每个并行工作进程只调用一次 init,然后循环运行函数,直到处理完所有的小型批处理。

error_threshold
必需
int

在处理过程中应忽略的 TabularDataset 记录失败数和 FileDataset 文件失败数。 如果错误计数高于此值,则作业将中止。 错误阈值适用于整个输入,而非适用于发送给 run() 方法的单个微型批处理。 范围为 [-1, int.max]。 -1 表示忽略处理过程中的所有失败。

output_action
必需
str

如何组织输出。 当前支持的值为“append_row”和“summary_only”。

  1. “append_row”- run() 方法调用输出的所有值将聚合到一个名为 parallel_run_step.txt 的唯一文件中,该文件在输出位置创建。
  2. “summary_only”- 用户脚本应自行存储输出。 对于已处理的每个成功输入项,仍需要一个输出行。 系统仅将此输出用于错误阈值计算(忽略行的实际值)。
compute_target
必需

用于 ParallelRunStep 执行的计算目标。 此参数可以指定为计算目标对象或工作区中计算目标的名称。

node_count
必需
int

计算目标中用于运行 ParallelRunStep 的节点数。

process_count_per_node
必需
int

每个节点并行运行入口脚本的工作进程数。 对于 GPU 计算机,默认值为 1。 对于 CPU 计算机,默认值为核心数。 工作进程会通过传递它获取的微型批来反复调用 run()。 作业中的工作进程总数为 process_count_per_node * node_count,这个数字决定了要并行执行的 run() 的最大数目。

mini_batch_size
必需
strint

对于 FileDataset 输入,此字段是用户脚本在一次 run() 调用中可以处理的文件数。 对于 TabularDataset 输入,此字段是用户脚本可以在一次 run() 调用中处理的数据的近似大小。 示例值为 1024、1024KB、10MB 和1GB。 (可选;对于 FileDataset,默认值为 10 个文件,对于 TabularDataset,默认值为 1MB。)

source_directory
必需
str

指向文件夹的路径,这些文件夹中包含用于在计算目标上执行的 entry_script 和支持文件。

description
必需
str

提供用于显示的 Batch 服务的说明。

logging_level
必需
str

在“日志记录”中定义的日志记录级别名称的字符串。 可能的值为“WARNING”、“INFO”和“DEBUG”。 (可选,默认值为“INFO”。)

run_invocation_timeout
必需
int

每次调用 run() 方法的超时时间(以秒为单位)。 (可选,默认值为 60。)

run_max_try
必需
int

针对失败或已超时微批的最大尝试次数。 范围为 [1,int.max]。 默认值为 3。 不会再次处理其取消排队计数大于此值的微批,将直接将其删除。

append_row_file_name
必需
str

如果 output_action 为“append_row”,则为输出文件的名称。 (可选,默认值为 "parallel_run_step.txt" )

allowed_failed_count
必需
int

处理过程中应忽略的已失败的小型批处理数。 如果失败计数高于此值,则作业将中止。 此阈值适用于整个输入,而不是发送给运行 () 方法的单个小型批处理。 范围为 [-1, int.max]。 -1 表示忽略处理过程中的所有失败。 小型批处理可能会在第一次处理时失败,并在第二次尝试时成功。 在第一次和第二次之间进行检查会将其计为失败。 第二次检查后,不会将其计为失败。 参数– error_threshold、– allowed_failed_count 和– allowed_failed_percent 可以一起工作。 如果指定了多个,则在该作业超过其中任何一个时,它将中止。

allowed_failed_percent
必需

处理过程中应忽略的已失败的小型批处理百分比。 如果失败百分比高于此值,则作业将中止。 此阈值适用于整个输入,而不是发送给运行 () 方法的单个小型批处理。 范围为 [0,100]。 100 或 100.0 表示忽略处理过程中的所有失败。 计划好所有小型批处理后开始检查。 参数– error_threshold、– allowed_failed_count 和– allowed_failed_percent 可以一起工作。 如果指定了多个,则在该作业超过其中任何一个时,它将中止。

partition_keys
必需

用于将数据集分区为小型批处理的键。 如果指定此参数,则具有相同键的数据将分区到相同的小型批处理中。 如果同时指定 partition_keys 和 mini_batch_size,将引发错误。 它应该是 str 元素列表,每个元素都是用于对输入数据集进行分区的键。 但是,如果升级到 PipelineParameter,则默认值应是列表的 json 转储 str,因为 PipelineParameter 中目前不支持 list 类型。 输入 (s) 必须是 () 的分区数据集,并且 partition_keys 必须是每个输入数据集的键的子集才能运行。

environment_variables
必需

环境变量名称和值的字典。 这些环境变量是在执行用户脚本的进程上设置的。

注解

ParallelRunConfig 类用于提供 ParallelRunStep 类的配置。 ParallelRunConfig 和 ParallelRunStep 可一起用于并行处理大量数据。 常见的用例是训练 ML 模型或运行脱机推理,以根据一批观察数据来生成预测。 ParallelRunStep 的工作原理是将数据分解成并行处理的批处理。 可以通过 ParallelRunConfig 类控制批大小、节点计数和其他可优化参数以加快并行处理速度。 ParallelRunStep 可以作为输入用于 TabularDatasetFileDataset

若要使用 ParallelRunStep 和 ParallelRunConfig:

  • 创建一个 ParallelRunConfig 对象用于指定批处理的执行方式,该对象的参数可控制批大小、每个计算目标的节点数,以及对自定义 Python 脚本的引用。

  • 创建使用 ParallelRunConfig 对象的 ParallelRunStep 对象,并定义该步骤的输入和输出。

  • 像使用其他管道步骤类型一样,在Pipeline中使用已配置的 ParallelRunStep 对象。

以下文章讨论了使用 ParallelRunStep 和 ParallelRunConfig 类进行批量推理的示例:


   from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig

   parallel_run_config = ParallelRunConfig(
       source_directory=scripts_folder,
       entry_script=script_file,
       mini_batch_size="5",        # or partition_keys=["key1", "key2"], which is another way to partition the
                                   # input to mini-batches, refer to the parameter description for details
       error_threshold=10,         # Optional, allowed failed count on mini batch items
       allowed_failed_count=15,    # Optional, allowed failed count on mini batches
       allowed_failed_percent=10,  # Optional, allowed failed percent on mini batches
       run_max_try=3,
       output_action="append_row",
       environment=batch_env,
       compute_target=compute_target,
       node_count=2)

   parallelrun_step = ParallelRunStep(
       name="predict-digits-mnist",
       parallel_run_config=parallel_run_config,
       inputs=[ named_mnist_ds ],
       output=output_dir,
       arguments=[ "--extra_arg", "example_value" ],
       allow_reuse=True
   )

有关此示例详细信息,请参阅笔记本 https://aka.ms/batch-inference-notebooks

方法

load_yaml

从 YAML 文件加载并行运行配置数据。

save_to_yaml

将并行运行配置数据导出到 YAML 文件。

load_yaml

从 YAML 文件加载并行运行配置数据。

static load_yaml(workspace, path)

参数

名称 说明
workspace
必需

从中读取配置数据的工作区。

path
必需
str

从中加载配置的路径。

save_to_yaml

将并行运行配置数据导出到 YAML 文件。

save_to_yaml(path)

参数

名称 说明
path
必需
str

要将文件保存到的路径。