你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

parallel 包

ParallelJob

并行作业。

RunFunction

运行函数。

函数

parallel_run_function

创建一个 Parallel 对象,该对象可在 dsl.pipeline 中用作函数,也可以创建为独立的并行作业。

有关使用 ParallelRunStep 的示例,请参阅笔记本 https://aka.ms/parallel-example-notebook


   from azure.ai.ml import Input, Output, parallel

   parallel_run = parallel_run_function(
       name="batch_score_with_tabular_input",
       display_name="Batch Score with Tabular Dataset",
       description="parallel component for batch score",
       inputs=dict(
           job_data_path=Input(
               type=AssetTypes.MLTABLE,
               description="The data to be split and scored in parallel",
           ),
           score_model=Input(
               type=AssetTypes.URI_FOLDER, description="The model for batch score."
           ),
       ),
       outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
       input_data="${{inputs.job_data_path}}",
       max_concurrency_per_instance=2,  # Optional, default is 1
       mini_batch_size="100",  # optional
       mini_batch_error_threshold=5,  # Optional, allowed failed count on mini batch items, default is -1
       logging_level="DEBUG",  # Optional, default is INFO
       error_threshold=5,  # Optional, allowed failed count totally, default is -1
       retry_settings=dict(max_retries=2, timeout=60),  # Optional
       task=RunFunction(
           code="./src",
           entry_script="tabular_batch_inference.py",
           environment=Environment(
               image="mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04",
               conda_file="./src/environment_parallel.yml",
           ),
           program_arguments="--model ${{inputs.score_model}}",
           append_row_to="${{outputs.job_output_path}}",  # Optional, if not set, summary_only
       ),
   )
parallel_run_function(*, name: str | None = None, description: str | None = None, tags: Dict | None = None, properties: Dict | None = None, display_name: str | None = None, experiment_name: str | None = None, compute: str | None = None, retry_settings: BatchRetrySettings | None = None, environment_variables: Dict | None = None, logging_level: str | None = None, max_concurrency_per_instance: int | None = None, error_threshold: int | None = None, mini_batch_error_threshold: int | None = None, task: RunFunction | None = None, mini_batch_size: str | None = None, partition_keys: List | None = None, input_data: str | None = None, inputs: Dict | None = None, outputs: Dict | None = None, instance_count: int | None = None, instance_type: str | None = None, docker_args: str | None = None, shm_size: str | None = None, identity: ManagedIdentity | AmlToken | None = None, is_deterministic: bool = True, **kwargs) -> Parallel

参数

name
str

创建的并行作业或组件的名称。

description
str

并行的友好说明。

tags
Dict

要附加到此并行的标记。

properties
Dict

资产属性字典。

display_name
str

一个易记名称。

experiment_name
str

将在下创建作业的试验的名称,如果提供 None,则默认值将设置为当前目录名称。 将忽略为管道步骤。

compute
str

如果将并行作业用作组件/函数) ,则不会使用执行并行作业 (计算的名称。

retry_settings
BatchRetrySettings

并行组件运行失败重试

environment_variables
Dict[str, str]

环境变量名称和值的字典。 这些环境变量是在执行用户脚本的进程上设置的。

logging_level
str

在“日志记录”中定义的日志记录级别名称的字符串。 可能的值为“WARNING”、“INFO”和“DEBUG”。 (可选,默认值为“INFO”。)可以通过 PipelineParameter 设置此值。

max_concurrency_per_instance
int

每个计算实例具有的最大并行度。

error_threshold
int

表格数据集的记录失败次数,以及处理过程中应忽略的文件数据集的文件失败次数。 如果错误计数高于此值,则作业将中止。 错误阈值针对的是整个输入,而不是发送到 run () 方法的单个小型批处理。 范围为 [-1, int.max]。 -1 指示忽略处理期间的所有失败

mini_batch_error_threshold
int

应忽略小型批处理失败次数

task
RunFunction

并行任务

mini_batch_size
str

对于 FileDataset 输入,此字段是用户脚本在一次 run() 调用中可以处理的文件数。 对于 TabularDataset 输入,此字段是用户脚本可以在一次 run() 调用中处理的数据的近似大小。 示例值为 1024、1024KB、10MB 和1GB。 (可选,则 FileDataset 的默认值为 10 个文件,对于 TabularDataset,默认值为 1MB。) 此值可以通过 PipelineParameter 设置。

partition_keys
List

用于将数据集分区为小型批处理的键。 如果指定此参数,则具有相同键的数据将分区到相同的小型批处理中。 如果同时指定partition_keys和mini_batch_size,分区键将生效。 输入 () 必须是分区数据集 () ,并且partition_keys必须是每个输入数据集的键子集,这样才能正常工作

input_data
str

输入数据。

inputs
Dict

此并行作业使用的输入字典。

outputs
Dict

此并行的输出

instance_count
int

计算目标使用的可选实例或节点数。 默认值为 1

instance_type
str

计算目标支持的 VM 的可选类型。

docker_args
str

要传递给 Docker run 命令的额外参数。 这将替代系统或本部分中已设置的任何参数。 此参数仅支持 Azure ML 计算类型。

shm_size
str

docker 容器的共享内存块的大小。 它的格式应为 (number) (单位) 其中数字大于 0,单位可以是 b (字节) 、k (千字节) 、m (兆字节) 或 g (千兆字节) 之一。

identity
Union[ <xref:azure.ai.ml._restclient.v2022_02_01_preview.models.ManagedIdentity>, <xref:azure.ai.ml._restclient.v2022_02_01_preview.models.AmlToken>]

在计算上运行训练作业时将使用的标识。

is_deterministic
bool

指定在输入相同的情况下并行作业是否返回相同的输出。 如果并行 (组件) 是确定性的,则当在管道中将其用作节点/步骤时,它将重用当前工作区中以前提交的作业的结果,该作业具有相同的输入和设置。 在这种情况下,此步骤不会使用任何计算资源。 默认值为 True,如果要避免此类重用行为,请指定is_deterministic=False,默认值为 True。

返回

并行节点

返回类型

注解

若要使用 parallel_run_function:

  • 创建一个 <xref:azure.ai.ml.entities._builders.Parallel> 对象以指定并行运行的执行方式,其中包含用于控制批大小的参数、每个计算目标的节点数,以及对自定义 Python 脚本的引用。

  • 使用并行对象作为函数的生成管道。 定义步骤的输入和输出。

  • 对要运行的管道进行求和。