你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
适用于:适用于 Python 的 Azure 机器学习 SDK v1
重要
本文提供有关使用 Azure 机器学习 SDK v1 的信息。 SDK v1 自 2025 年 3 月 31 日起弃用。 对它的支持将于 2026 年 6 月 30 日结束。 可以在该日期之前安装和使用 SDK v1。
建议在 2026 年 6 月 30 日之前过渡到 SDK v2。 有关 SDK v2 的详细信息,请参阅 什么是 Azure 机器学习 CLI 和 Python SDK v2? 以及 SDK v2 参考。
本文提供用于在 Azure 机器学习管道中的步骤之间导入数据、转换数据以及移动数据的代码。 有关数据在 Azure 机器学习中的工作原理的概述,请参阅访问 Azure 存储服务中的数据。 有关 Azure 机器学习管道的优点和结构的信息,请参阅什么是 Azure 机器学习管道?
本文介绍如何执行以下操作:
- 将
Dataset
对象用于预先存在的数据 - 在步骤中访问数据
- 将
Dataset
数据拆分为子集,例如训练子集和验证子集 - 创建
OutputFileDatasetConfig
对象来将数据传输到下一管道步骤 - 使用
OutputFileDatasetConfig
对象作为管道步骤的输入 - 从要保留的
Dataset
创建新的OutputFileDatasetConfig
对象
先决条件
Azure 订阅。 如果没有,请在开始之前创建一个免费帐户。 试用免费版或付费版 Azure 机器学习。
Azure 机器学习工作区。
创建 Azure 机器学习工作区 或通过 Python SDK 使用现有工作区。 导入
Workspace
和Datastore
类,并通过使用函数config.json
从from_config()
文件中加载订阅信息。 此函数默认查找当前目录中的 JSON 文件,但还可以指定路径参数以使用from_config(path="your/file/path")
。import azureml.core from azureml.core import Workspace, Datastore ws = Workspace.from_config()
某些预先存在的数据。 本文简要介绍了如何使用 Azure blob 容器。
可选:现有机器学习管道,例如使用 Azure 机器学习 SDK 创建和运行机器学习管道中所述的管道。
将 Dataset
对象用于预先存在的数据
将数据引入到管道的首选方法是使用 Dataset 对象。 Dataset
对象表示在整个工作区中可用的持久性数据。
可以通过许多方法来创建和注册 Dataset
对象。 表格数据集适用于一个或多个文件中可用的带分隔符的数据。 文件数据集用于二进制数据(例如图像)或要分析的数据。 创建 Dataset
对象的最简单编程方式是使用工作区存储或公共 URL 中的现有 blob:
datastore = Datastore.get(workspace, 'training_data')
iris_dataset = Dataset.Tabular.from_delimited_files(DataPath(datastore, 'iris.csv'))
datastore_path = [
DataPath(datastore, 'animals/dog/1.jpg'),
DataPath(datastore, 'animals/dog/2.jpg'),
DataPath(datastore, 'animals/cat/*.jpg')
]
cats_dogs_dataset = Dataset.File.from_files(path=datastore_path)
有关使用不同选项和不同源创建数据集、在 Azure 机器学习 UI 中注册和查看数据集、了解数据大小如何与计算容量交互以及版本控制的详细信息,请参阅 “创建 Azure 机器学习数据集”。
将数据集传递给脚本
若要将数据集的路径传递给你的脚本,请使用 Dataset
对象的 as_named_input()
方法。 可以将生成的 DatasetConsumptionConfig
对象作为参数传递给脚本,也可以使用管道脚本的参数 inputs
来检索数据集 Run.get_context().input_datasets[]
。
创建命名输入后,可以选择其访问模式( FileDataset
仅适用于): as_mount()
或 as_download()
。 如果脚本处理数据集中的所有文件,并且计算资源上的磁盘足够大,则下载访问模式是更好的选择。 下载访问模式可避免在运行时流式传输数据所产生的开销。 如果脚本访问数据集的子集或计算资源过大,请使用装载访问模式。 有关详细信息,请参阅 装载与下载。
将数据集传递给管道步骤:
- 使用
TabularDataset.as_named_input()
或FileDataset.as_named_input()
(末尾没有 ) 创建DatasetConsumptionConfig
对象 - 仅适用于
FileDataset
: 使用as_mount()
或as_download()
设置访问模式。 使用TabularDataset
时,无法设置访问模式。 - 使用
arguments
或inputs
将数据集传递给管道步骤。
以下代码片段展示了如何在 PythonScriptStep
构造函数中使用 iris_dataset
(TabularDataset
)来组合这些步骤的常见模式:
train_step = PythonScriptStep(
name="train_data",
script_name="train.py",
compute_target=cluster,
inputs=[iris_dataset.as_named_input('iris')]
)
注意
你需要将所有这些参数(即, "train_data"
、 "train.py"
、 cluster
和 iris_dataset
)的值替换为你自己的数据。
上面的代码片段只是显示调用的形式,不属于Microsoft示例。
还可以使用类似 random_split()
和 take_sample()
创建多个输入的方法,或减少传递到管道步骤的数据量:
seed = 42 # PRNG seed
smaller_dataset = iris_dataset.take_sample(0.1, seed=seed) # 10%
train, test = smaller_dataset.random_split(percentage=0.8, seed=seed)
train_step = PythonScriptStep(
name="train_data",
script_name="train.py",
compute_target=cluster,
inputs=[train.as_named_input('train'), test.as_named_input('test')]
)
在脚本中访问数据集
管道步骤脚本的命名输入可以在 Run
对象中用作字典。 使用Run
检索活动Run.get_context()
对象,然后使用input_datasets
检索命名输入的字典。 如果通过DatasetConsumptionConfig
参数而不是arguments
参数传递inputs
对象,请使用ArgumentParser
代码访问数据。 以下代码片段演示了这两种技术:
管道定义脚本
# Code is for demonstration only: It would be confusing to split datasets between `arguments` and `inputs`
train_step = PythonScriptStep(
name="train_data",
script_name="train.py",
compute_target=cluster,
# Datasets passed as arguments
arguments=['--training-folder', train.as_named_input('train').as_download()],
# Datasets passed as inputs
inputs=[test.as_named_input('test').as_download()]
)
从 PythonScriptStep 引用的 train.py
脚本
# In pipeline script
parser = argparse.ArgumentParser()
# Retrieve the dataset passed as an argument
parser.add_argument('--training-folder', type=str, dest='train_folder', help='training data folder mounting point')
args = parser.parse_args()
training_data_folder = args.train_folder
# Retrieve the dataset passed as an input
testing_data_folder = Run.get_context().input_datasets['test']
传递的值是数据集文件或文件的路径。
由于已注册的数据集是永久性的,并且跨工作区共享,因此可以直接检索它们:
run = Run.get_context()
ws = run.experiment.workspace
ds = Dataset.get_by_name(workspace=ws, name='mnist_opendataset')
注意
前面的代码片段显示了调用的形式。 它们不属于Microsoft示例。 需要将参数替换为自己项目中的值。
将 OutputFileDatasetConfig
用于中间数据
尽管 Dataset
对象仅表示持久性数据,OutputFileDatasetConfig
对象可用于管道步骤中的临时数据输出以及持久性输出数据。 OutputFileDatasetConfig
支持将数据写入 Blob 存储、文件共享、Azure Data Lake Storage Gen1 或 Data Lake Storage Gen2。 它同时支持装载模式和上传模式。 在装载模式下,当文件关闭时,写入到装载的目录中的文件将永久存储。 在上传模式下,在作业结束时,将上传写入到输出目录中的文件。 如果作业失败或取消,则不会上传输出目录。
对象 OutputFileDatasetConfig
的默认行为是写入工作区的默认数据存储。 使用OutputFileDatasetConfig
参数将PythonScriptStep
对象传递给arguments
。
from azureml.data import OutputFileDatasetConfig
dataprep_output = OutputFileDatasetConfig()
input_dataset = Dataset.get_by_name(workspace, 'raw_data')
dataprep_step = PythonScriptStep(
name="prep_data",
script_name="dataprep.py",
compute_target=cluster,
arguments=[input_dataset.as_named_input('raw_data').as_mount(), dataprep_output]
)
注意
向 OutputFileDatasetConfig
进行的并发写入会失败。 不要尝试同时使用单个 OutputFileDatasetConfig
。 不要在多处理情况下共享一个 OutputFileDatasetConfig
,就像使用 分布式训练时一样。
将 OutputFileDatasetConfig
用作训练步骤的输出
在你的管道中PythonScriptStep
,可以通过使用程序的参数来检索可用的输出路径。 如果此步骤是第一步,并且将初始化输出数据,则需要在指定的路径上创建目录。 然后,您可以编写任何要包含在 OutputFileDatasetConfig
中的文件。
parser = argparse.ArgumentParser()
parser.add_argument('--output_path', dest='output_path', required=True)
args = parser.parse_args()
# Make directory for file
os.makedirs(os.path.dirname(args.output_path), exist_ok=True)
with open(args.output_path, 'w') as f:
f.write("Step 1's output")
读取 OutputFileDatasetConfig
作为非初始步骤的输入
当初始管道步骤将一些数据写入到 OutputFileDatasetConfig
路径并且这些数据成为该初始步骤的输出后,可将其用作后面步骤的输入。
在以下代码中:
step1_output_data
指示在上传访问模式下将PythonScriptStep
step1
的输出写入到 Data Lake Storage Gen2 数据存储my_adlsgen2
中。 有关设置角色权限以将数据写回到 Data Lake Storage Gen2 数据存储的信息,请参阅 使用数据存储连接到 Azure 上的存储服务。在
step1
完成并将输出写入step1_output_data
指示的目标后,step2
已准备好将step1_output_data
用作输入。
# Get Data Lake Storage Gen2 datastore that's already registered with the workspace
datastore = workspace.datastores['my_adlsgen2']
step1_output_data = OutputFileDatasetConfig(name="processed_data", destination=(datastore, "mypath/{run-id}/{output-name}")).as_upload()
step1 = PythonScriptStep(
name="generate_data",
script_name="step1.py",
runconfig = aml_run_config,
arguments = ["--output_path", step1_output_data]
)
step2 = PythonScriptStep(
name="read_pipeline_data",
script_name="step2.py",
compute_target=compute,
runconfig = aml_run_config,
arguments = ["--pd", step1_output_data.as_input()]
)
pipeline = Pipeline(workspace=ws, steps=[step1, step2])
提示
在 Python 脚本 step2.py
中读取数据的过程与 之前在脚本中的 Access 数据集中所述的过程相同。 使用 ArgumentParser
在脚本中添加 --pd
参数以访问数据。
注册 OutputFileDatasetConfig
对象供重复使用
如果希望某个 OutputFileDatasetConfig
对象在实验结束后依然可用,请将其注册到工作区,以便在不同实验之间进行共享和重复使用。
step1_output_ds = step1_output_data.register_on_complete(
name='processed_data',
description = 'files from step1'
)
不再需要时删除OutputFileDatasetConfig
的内容
Azure 不会自动删除用 OutputFileDatasetConfig
写入的中间数据。 若要避免大量不需要的数据的存储费用,应执行以下作之一:
不再需要中间数据时,可在管道作业结束时以编程方式删除这些数据。
使用 Blob 存储和临时存储策略来管理中间数据。 (请参阅 通过自动执行 Azure Blob 存储访问层来优化成本。此策略只能在工作区的非默认数据存储上设置。 使用
OutputFileDatasetConfig
将中间数据导出到另一个非默认数据存储。# Get Data Lake Storage Gen2 datastore that's already registered with the workspace datastore = workspace.datastores['my_adlsgen2'] step1_output_data = OutputFileDatasetConfig(name="processed_data", destination=(datastore, "mypath/{run-id}/{output-name}")).as_upload()
定期查看数据并删除不需要的数据。
警告
仅删除自数据上次更改日期起 30 天后的中间数据。 删除先前的中间数据可能会导致管道运行失败,因为管道假定数据在 30 天内存在以供重复使用。
有关详细信息,请参阅计划管理 Azure 机器学习的成本。