你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

将数据移入机器学习管道和在机器学习管道之间移动数据的步骤 (Python)

适用于:适用于 Python 的 Azure 机器学习 SDK v1

重要

本文提供有关使用 Azure 机器学习 SDK v1 的信息。 SDK v1 自 2025 年 3 月 31 日起弃用。 对它的支持将于 2026 年 6 月 30 日结束。 可以在该日期之前安装和使用 SDK v1。

建议在 2026 年 6 月 30 日之前过渡到 SDK v2。 有关 SDK v2 的详细信息,请参阅 什么是 Azure 机器学习 CLI 和 Python SDK v2? 以及 SDK v2 参考

本文提供用于在 Azure 机器学习管道中的步骤之间导入数据、转换数据以及移动数据的代码。 有关数据在 Azure 机器学习中的工作原理的概述,请参阅访问 Azure 存储服务中的数据。 有关 Azure 机器学习管道的优点和结构的信息,请参阅什么是 Azure 机器学习管道?

本文介绍如何执行以下操作:

  • Dataset 对象用于预先存在的数据
  • 在步骤中访问数据
  • Dataset 数据拆分为子集,例如训练子集和验证子集
  • 创建 OutputFileDatasetConfig 对象来将数据传输到下一管道步骤
  • 使用 OutputFileDatasetConfig 对象作为管道步骤的输入
  • 从要保留的 Dataset 创建新的 OutputFileDatasetConfig 对象

先决条件

Dataset 对象用于预先存在的数据

将数据引入到管道的首选方法是使用 Dataset 对象。 Dataset 对象表示在整个工作区中可用的持久性数据。

可以通过许多方法来创建和注册 Dataset 对象。 表格数据集适用于一个或多个文件中可用的带分隔符的数据。 文件数据集用于二进制数据(例如图像)或要分析的数据。 创建 Dataset 对象的最简单编程方式是使用工作区存储或公共 URL 中的现有 blob:

datastore = Datastore.get(workspace, 'training_data')
iris_dataset = Dataset.Tabular.from_delimited_files(DataPath(datastore, 'iris.csv'))

datastore_path = [
    DataPath(datastore, 'animals/dog/1.jpg'),
    DataPath(datastore, 'animals/dog/2.jpg'),
    DataPath(datastore, 'animals/cat/*.jpg')
]
cats_dogs_dataset = Dataset.File.from_files(path=datastore_path)

有关使用不同选项和不同源创建数据集、在 Azure 机器学习 UI 中注册和查看数据集、了解数据大小如何与计算容量交互以及版本控制的详细信息,请参阅 “创建 Azure 机器学习数据集”。

将数据集传递给脚本

若要将数据集的路径传递给你的脚本,请使用 Dataset 对象的 as_named_input() 方法。 可以将生成的 DatasetConsumptionConfig 对象作为参数传递给脚本,也可以使用管道脚本的参数 inputs 来检索数据集 Run.get_context().input_datasets[]

创建命名输入后,可以选择其访问模式( FileDataset 仅适用于): as_mount()as_download()。 如果脚本处理数据集中的所有文件,并且计算资源上的磁盘足够大,则下载访问模式是更好的选择。 下载访问模式可避免在运行时流式传输数据所产生的开销。 如果脚本访问数据集的子集或计算资源过大,请使用装载访问模式。 有关详细信息,请参阅 装载与下载

将数据集传递给管道步骤:

  1. 使用 TabularDataset.as_named_input()FileDataset.as_named_input() (末尾没有 创建 DatasetConsumptionConfig 对象
  2. 仅适用于 FileDataset 使用 as_mount()as_download() 设置访问模式。 使用 TabularDataset时,无法设置访问模式。
  3. 使用 argumentsinputs 将数据集传递给管道步骤。

以下代码片段展示了如何在 PythonScriptStep 构造函数中使用 iris_datasetTabularDataset)来组合这些步骤的常见模式:


train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    inputs=[iris_dataset.as_named_input('iris')]
)

注意

你需要将所有这些参数(即, "train_data""train.py"clusteriris_dataset)的值替换为你自己的数据。 上面的代码片段只是显示调用的形式,不属于Microsoft示例。

还可以使用类似 random_split()take_sample() 创建多个输入的方法,或减少传递到管道步骤的数据量:

seed = 42 # PRNG seed
smaller_dataset = iris_dataset.take_sample(0.1, seed=seed) # 10%
train, test = smaller_dataset.random_split(percentage=0.8, seed=seed)

train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    inputs=[train.as_named_input('train'), test.as_named_input('test')]
)

在脚本中访问数据集

管道步骤脚本的命名输入可以在 Run 对象中用作字典。 使用Run检索活动Run.get_context()对象,然后使用input_datasets检索命名输入的字典。 如果通过DatasetConsumptionConfig参数而不是arguments参数传递inputs对象,请使用ArgumentParser代码访问数据。 以下代码片段演示了这两种技术:

管道定义脚本

# Code is for demonstration only: It would be confusing to split datasets between `arguments` and `inputs`
train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    # Datasets passed as arguments
    arguments=['--training-folder', train.as_named_input('train').as_download()],
    # Datasets passed as inputs
    inputs=[test.as_named_input('test').as_download()]
)

从 PythonScriptStep 引用的 train.py 脚本

# In pipeline script
parser = argparse.ArgumentParser()
# Retrieve the dataset passed as an argument
parser.add_argument('--training-folder', type=str, dest='train_folder', help='training data folder mounting point')
args = parser.parse_args()
training_data_folder = args.train_folder
# Retrieve the dataset passed as an input
testing_data_folder = Run.get_context().input_datasets['test']

传递的值是数据集文件或文件的路径。

由于已注册的数据集是永久性的,并且跨工作区共享,因此可以直接检索它们:

run = Run.get_context()
ws = run.experiment.workspace
ds = Dataset.get_by_name(workspace=ws, name='mnist_opendataset')

注意

前面的代码片段显示了调用的形式。 它们不属于Microsoft示例。 需要将参数替换为自己项目中的值。

OutputFileDatasetConfig 用于中间数据

尽管 Dataset 对象仅表示持久性数据,OutputFileDatasetConfig 对象可用于管道步骤中的临时数据输出以及持久性输出数据。 OutputFileDatasetConfig 支持将数据写入 Blob 存储、文件共享、Azure Data Lake Storage Gen1 或 Data Lake Storage Gen2。 它同时支持装载模式和上传模式。 在装载模式下,当文件关闭时,写入到装载的目录中的文件将永久存储。 在上传模式下,在作业结束时,将上传写入到输出目录中的文件。 如果作业失败或取消,则不会上传输出目录。

对象 OutputFileDatasetConfig 的默认行为是写入工作区的默认数据存储。 使用OutputFileDatasetConfig参数将PythonScriptStep对象传递给arguments

from azureml.data import OutputFileDatasetConfig
dataprep_output = OutputFileDatasetConfig()
input_dataset = Dataset.get_by_name(workspace, 'raw_data')

dataprep_step = PythonScriptStep(
    name="prep_data",
    script_name="dataprep.py",
    compute_target=cluster,
    arguments=[input_dataset.as_named_input('raw_data').as_mount(), dataprep_output]
    )

注意

OutputFileDatasetConfig 进行的并发写入会失败。 不要尝试同时使用单个 OutputFileDatasetConfig 。 不要在多处理情况下共享一个 OutputFileDatasetConfig ,就像使用 分布式训练时一样。

OutputFileDatasetConfig 用作训练步骤的输出

在你的管道中PythonScriptStep,可以通过使用程序的参数来检索可用的输出路径。 如果此步骤是第一步,并且将初始化输出数据,则需要在指定的路径上创建目录。 然后,您可以编写任何要包含在 OutputFileDatasetConfig 中的文件。

parser = argparse.ArgumentParser()
parser.add_argument('--output_path', dest='output_path', required=True)
args = parser.parse_args()

# Make directory for file
os.makedirs(os.path.dirname(args.output_path), exist_ok=True)
with open(args.output_path, 'w') as f:
    f.write("Step 1's output")

读取 OutputFileDatasetConfig 作为非初始步骤的输入

当初始管道步骤将一些数据写入到 OutputFileDatasetConfig 路径并且这些数据成为该初始步骤的输出后,可将其用作后面步骤的输入。

在以下代码中:

  • step1_output_data 指示在上传访问模式下将 PythonScriptStepstep1 的输出写入到 Data Lake Storage Gen2 数据存储 my_adlsgen2 中。 有关设置角色权限以将数据写回到 Data Lake Storage Gen2 数据存储的信息,请参阅 使用数据存储连接到 Azure 上的存储服务

  • step1完成并将输出写入step1_output_data指示的目标后,step2已准备好将step1_output_data用作输入。

# Get Data Lake Storage Gen2 datastore that's already registered with the workspace
datastore = workspace.datastores['my_adlsgen2']
step1_output_data = OutputFileDatasetConfig(name="processed_data", destination=(datastore, "mypath/{run-id}/{output-name}")).as_upload()

step1 = PythonScriptStep(
    name="generate_data",
    script_name="step1.py",
    runconfig = aml_run_config,
    arguments = ["--output_path", step1_output_data]
)

step2 = PythonScriptStep(
    name="read_pipeline_data",
    script_name="step2.py",
    compute_target=compute,
    runconfig = aml_run_config,
    arguments = ["--pd", step1_output_data.as_input()]

)

pipeline = Pipeline(workspace=ws, steps=[step1, step2])

提示

在 Python 脚本 step2.py 中读取数据的过程与 之前在脚本中的 Access 数据集中所述的过程相同。 使用 ArgumentParser 在脚本中添加 --pd 参数以访问数据。

注册 OutputFileDatasetConfig 对象供重复使用

如果希望某个 OutputFileDatasetConfig 对象在实验结束后依然可用,请将其注册到工作区,以便在不同实验之间进行共享和重复使用。

step1_output_ds = step1_output_data.register_on_complete(
    name='processed_data', 
    description = 'files from step1'
)

不再需要时删除OutputFileDatasetConfig的内容

Azure 不会自动删除用 OutputFileDatasetConfig 写入的中间数据。 若要避免大量不需要的数据的存储费用,应执行以下作之一:

  • 不再需要中间数据时,可在管道作业结束时以编程方式删除这些数据。

  • 使用 Blob 存储和临时存储策略来管理中间数据。 (请参阅 通过自动执行 Azure Blob 存储访问层来优化成本。此策略只能在工作区的非默认数据存储上设置。 使用 OutputFileDatasetConfig 将中间数据导出到另一个非默认数据存储。

    # Get Data Lake Storage Gen2 datastore that's already registered with the workspace
    datastore = workspace.datastores['my_adlsgen2']
    step1_output_data = OutputFileDatasetConfig(name="processed_data", destination=(datastore, "mypath/{run-id}/{output-name}")).as_upload()
    
  • 定期查看数据并删除不需要的数据。

警告

仅删除自数据上次更改日期起 30 天后的中间数据。 删除先前的中间数据可能会导致管道运行失败,因为管道假定数据在 30 天内存在以供重复使用。

有关详细信息,请参阅计划管理 Azure 机器学习的成本

后续步骤