你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

教程:生成用于图像分类的 Azure 机器学习管道

适用于:Python SDK azureml v1

本教程介绍如何生成 Azure 机器学习管道来准备数据和训练机器学习模型。 机器学习管道可以优化工作流以提高其速度、可移植性和可重用性,使你能够将工作重心放在机器学习上,而不必关注基础结构和自动化。

该示例将训练一个小型 Keras 卷积神经网络,以对 Fashion MNIST 数据集中的图像进行分类。

在本教程中,请完成以下任务:

  • 配置工作区
  • 创建试验来保存工作
  • 预配 ComputeTarget 以执行该工作
  • 创建用于存储压缩数据的数据集
  • 创建管道步骤以准备要训练的数据
  • 定义执行训练的运行时环境
  • 创建管道步骤以定义神经网络并执行训练
  • 通过管道步骤撰写管道
  • 在试验中运行管道
  • 查看步骤的输出和经训练的神经网络
  • 注册模型供进一步使用

如果没有 Azure 订阅,请在开始操作前先创建一个免费帐户。 立即试用免费版或付费版 Azure 机器学习

先决条件

  • 如果还没有 Azure 机器学习工作区,请完成创建帮助入门的资源
  • 已在其中安装 azureml-coreazureml-pipeline 包的 Python 环境。 此环境用于定义和控制 Azure 机器学习资源,独立于运行时用于训练的环境。

重要

目前,与 azureml-pipeline 兼容的最新 Python 版本是 Python 3.8。 如果在安装 azureml-pipeline 包时遇到困难,请确保 python --version 是兼容版本。 有关说明,请参阅 Python 虚拟环境管理器(venvconda 等)的文档。

启动交互式 Python 会话

本教程使用适用于 Azure 机器学习的 Python SDK 创建和控制 Azure 机器学习管道。 本教程假定你将在 Python REPL 环境或 Jupyter 笔记本中以交互方式运行代码片段。

  • 本教程基于 Azure 机器学习示例 存储库的 python-sdk/tutorial/using-pipelines 目录中的 image-classification.ipynb 笔记本。 步骤本身的源代码位于 keras-mnist-fashion 子目录中。

导入类型

导入本教程所需的所有 Azure 机器学习类型:

import os
import azureml.core
from azureml.core import (
    Workspace,
    Experiment,
    Dataset,
    Datastore,
    ComputeTarget,
    Environment,
    ScriptRunConfig
)
from azureml.data import OutputFileDatasetConfig
from azureml.core.compute import AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline

# check core SDK version number
print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)

Azure 机器学习 SDK 版本应为 1.37 或更高版本。 如果不是,请使用 pip install --upgrade azureml-core 进行升级。

配置工作区

从现有的 Azure 机器学习工作区创建工作区对象。

workspace = Workspace.from_config()

重要

此代码片段需要将工作区配置保存到当前目录或其父目录中。 若要详细了解如何创建工作区,请参阅创建工作区资源。 有关将配置保存到文件的详细信息,请参阅创建工作区配置文件

为管道创建基础结构

创建一个 Experiment 对象来保存管道运行的结果:

exp = Experiment(workspace=workspace, name="keras-mnist-fashion")

创建一个 ComputeTarget,表示管道将在其上运行的计算机资源。 即使在基于 CPU 的计算机上,本教程中使用的简单神经网络也只需几分钟即可完成训练。 如果要使用 GPU 进行训练,请将 use_gpu 设置为 True。 预配计算目标通常需要大约五分钟。

use_gpu = False

# choose a name for your cluster
cluster_name = "gpu-cluster" if use_gpu else "cpu-cluster"

found = False
# Check if this compute target already exists in the workspace.
cts = workspace.compute_targets
if cluster_name in cts and cts[cluster_name].type == "AmlCompute":
    found = True
    print("Found existing compute target.")
    compute_target = cts[cluster_name]
if not found:
    print("Creating a new compute target...")
    compute_config = AmlCompute.provisioning_configuration(
        vm_size= "STANDARD_NC6" if use_gpu else "STANDARD_D2_V2"
        # vm_priority = 'lowpriority', # optional
        max_nodes=4,
    )

    # Create the cluster.
    compute_target = ComputeTarget.create(workspace, cluster_name, compute_config)

    # Can poll for a minimum number of nodes and for a specific timeout.
    # If no min_node_count is provided, it will use the scale settings for the cluster.
    compute_target.wait_for_completion(
        show_output=True, min_node_count=None, timeout_in_minutes=10
    )
# For a more detailed view of current AmlCompute status, use get_status().print(compute_target.get_status().serialize())

备注

GPU 可用性取决于 Azure 订阅的配额和 Azure 容量。 请参阅管理和增大 Azure 机器学习资源的配额

为 Azure 存储的数据创建数据集

Fashion-MNIST 是一个时尚图像数据集,包含 10 个类别。 每张图像都是 28x28 的灰度图像,有 60,000 张训练图像和 10,000 张测试图像。 作为图像分类问题,Fashion-MNIST 比经典 MNIST 手写数字数据库更难。 它以与原始手写数字数据库相同的压缩二进制形式分发。

若要创建引用基于 Web 的数据的 Dataset,请运行:

data_urls = ["https://data4mldemo6150520719.blob.core.windows.net/demo/mnist-fashion"]
fashion_ds = Dataset.File.from_files(data_urls)

# list the files referenced by fashion_ds
print(fashion_ds.to_path())

此代码将快速完成。 基础数据保留在 data_urls 数组中指定的 Azure 存储资源中。

创建数据准备管道步骤

此管道的第一步是将 fashion_ds 的压缩数据文件转换为你自己的工作区中的数据集,其中包含可供训练使用的 CSV 文件。 向工作区注册后,你的协作者可以访问此数据进行自己的分析、训练等

datastore = workspace.get_default_datastore()
prepared_fashion_ds = OutputFileDatasetConfig(
    destination=(datastore, "outputdataset/{run-id}")
).register_on_complete(name="prepared_fashion_ds")

上述代码指定了一个基于管道步骤输出的数据集。 基础已处理文件将放入工作区的默认数据存储的 blob 存储中,位于 destination 中指定的路径。 数据集将在名为 prepared_fashion_ds 的工作区中注册。

创建管道步骤的源

到目前为止,执行的代码已创建并控制了 Azure 资源。 现在,可以编写在域中执行第一步的代码。

如果按照 Azure 机器学习示例存储库中的示例进行操作,则源文件已作为 keras-mnist-fashion/prepare.py 提供。

如果是从头开始操作,请创建名为 keras-mnist-fashion/ 的子目录。 创建一个新文件,将以下代码添加到其中,并将文件命名为 prepare.py

# prepare.py
# Converts MNIST-formatted files at the passed-in input path to a passed-in output path
import os
import sys

# Conversion routine for MNIST binary format
def convert(imgf, labelf, outf, n):
    f = open(imgf, "rb")
    l = open(labelf, "rb")
    o = open(outf, "w")

    f.read(16)
    l.read(8)
    images = []

    for i in range(n):
        image = [ord(l.read(1))]
        for j in range(28 * 28):
            image.append(ord(f.read(1)))
        images.append(image)

    for image in images:
        o.write(",".join(str(pix) for pix in image) + "\n")
    f.close()
    o.close()
    l.close()

# The MNIST-formatted source
mounted_input_path = sys.argv[1]
# The output directory at which the outputs will be written
mounted_output_path = sys.argv[2]

# Create the output directory
os.makedirs(mounted_output_path, exist_ok=True)

# Convert the training data
convert(
    os.path.join(mounted_input_path, "mnist-fashion/train-images-idx3-ubyte"),
    os.path.join(mounted_input_path, "mnist-fashion/train-labels-idx1-ubyte"),
    os.path.join(mounted_output_path, "mnist_train.csv"),
    60000,
)

# Convert the test data
convert(
    os.path.join(mounted_input_path, "mnist-fashion/t10k-images-idx3-ubyte"),
    os.path.join(mounted_input_path, "mnist-fashion/t10k-labels-idx1-ubyte"),
    os.path.join(mounted_output_path, "mnist_test.csv"),
    10000,
)

prepare.py 中的代码采用两个命令行参数:第一个分配给 mounted_input_path,第二个分配给 mounted_output_path。 如果该子目录不存在,则调用 os.makedirs 会创建该目录。 然后,程序将转换训练和测试数据,并将逗号分隔的文件输出到 mounted_output_path

指定管道步骤

返回用于指定管道的 Python 环境,运行以下代码为准备代码创建 PythonScriptStep

script_folder = "./keras-mnist-fashion"

prep_step = PythonScriptStep(
    name="prepare step",
    script_name="prepare.py",
    # On the compute target, mount fashion_ds dataset as input, prepared_fashion_ds as output
    arguments=[fashion_ds.as_named_input("fashion_ds").as_mount(), prepared_fashion_ds],
    source_directory=script_folder,
    compute_target=compute_target,
    allow_reuse=True,
)

PythonScriptStep 的调用指定在运行管道步骤时:

  • script_folder 目录中的所有文件都上传到 compute_target
  • 在这些上传的源文件中,将运行文件 prepare.py
  • fashion_dsprepared_fashion_ds 数据集将装载在 compute_target 上,并显示为目录
  • fashion_ds 文件的路径将是 prepare.py 的第一个参数。 在 prepare.py 中,此参数分配给 mounted_input_path
  • prepared_fashion_ds 的路径将是 prepare.py 的第二个参数。 在 prepare.py 中,此参数分配给 mounted_output_path
  • 因为 allow_reuseTrue,所以在其源文件或输入更改之前,它不会重新运行
  • PythonScriptStep 将被命名为 prepare step

模块化和重用是管道的主要优势。 Azure 机器学习可自动确定源代码或数据集更改。 如果 allow_reuseTrue,则将重用不受影响的步骤的输出,而不会再次重新运行这些步骤。 如果某个步骤依赖于 Azure 机器学习外部可能发生变化的数据源(例如,包含销售数据的 URL),请将 allow_reuse 设置为 False,在每次运行管道时都运行管道步骤。

创建训练步骤

数据从压缩格式转换为 CSV 文件后,可用于训练卷积神经网络。

创建训练步骤的源

使用较大的管道时,最佳做法是将每个步骤的源代码放在单独的目录(src/prepare/src/train/ 等)中,但对于本教程,只需在同一 keras-mnist-fashion/ 源目录中使用或创建文件 train.py

import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.layers.normalization import BatchNormalization
from keras.utils import to_categorical
from keras.callbacks import Callback

import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from azureml.core import Run

# dataset object from the run
run = Run.get_context()
dataset = run.input_datasets["prepared_fashion_ds"]

# split dataset into train and test set
(train_dataset, test_dataset) = dataset.random_split(percentage=0.8, seed=111)

# load dataset into pandas dataframe
data_train = train_dataset.to_pandas_dataframe()
data_test = test_dataset.to_pandas_dataframe()

img_rows, img_cols = 28, 28
input_shape = (img_rows, img_cols, 1)

X = np.array(data_train.iloc[:, 1:])
y = to_categorical(np.array(data_train.iloc[:, 0]))

# here we split validation data to optimiza classifier during training
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=13)

# test data
X_test = np.array(data_test.iloc[:, 1:])
y_test = to_categorical(np.array(data_test.iloc[:, 0]))


X_train = (
    X_train.reshape(X_train.shape[0], img_rows, img_cols, 1).astype("float32") / 255
)
X_test = X_test.reshape(X_test.shape[0], img_rows, img_cols, 1).astype("float32") / 255
X_val = X_val.reshape(X_val.shape[0], img_rows, img_cols, 1).astype("float32") / 255

batch_size = 256
num_classes = 10
epochs = 10

# construct neuron network
model = Sequential()
model.add(
    Conv2D(
        32,
        kernel_size=(3, 3),
        activation="relu",
        kernel_initializer="he_normal",
        input_shape=input_shape,
    )
)
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.25))
model.add(Conv2D(64, (3, 3), activation="relu"))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Conv2D(128, (3, 3), activation="relu"))
model.add(Dropout(0.4))
model.add(Flatten())
model.add(Dense(128, activation="relu"))
model.add(Dropout(0.3))
model.add(Dense(num_classes, activation="softmax"))

model.compile(
    loss=keras.losses.categorical_crossentropy,
    optimizer=keras.optimizers.Adam(),
    metrics=["accuracy"],
)

# start an Azure ML run
run = Run.get_context()


class LogRunMetrics(Callback):
    # callback at the end of every epoch
    def on_epoch_end(self, epoch, log):
        # log a value repeated which creates a list
        run.log("Loss", log["loss"])
        run.log("Accuracy", log["accuracy"])


history = model.fit(
    X_train,
    y_train,
    batch_size=batch_size,
    epochs=epochs,
    verbose=1,
    validation_data=(X_val, y_val),
    callbacks=[LogRunMetrics()],
)

score = model.evaluate(X_test, y_test, verbose=0)

# log a single value
run.log("Final test loss", score[0])
print("Test loss:", score[0])

run.log("Final test accuracy", score[1])
print("Test accuracy:", score[1])

plt.figure(figsize=(6, 3))
plt.title("Fashion MNIST with Keras ({} epochs)".format(epochs), fontsize=14)
plt.plot(history.history["accuracy"], "b-", label="Accuracy", lw=4, alpha=0.5)
plt.plot(history.history["loss"], "r--", label="Loss", lw=4, alpha=0.5)
plt.legend(fontsize=12)
plt.grid(True)

# log an image
run.log_image("Loss v.s. Accuracy", plot=plt)

# create a ./outputs/model folder in the compute target
# files saved in the "./outputs" folder are automatically uploaded into run history
os.makedirs("./outputs/model", exist_ok=True)

# serialize NN architecture to JSON
model_json = model.to_json()
# save model JSON
with open("./outputs/model/model.json", "w") as f:
    f.write(model_json)
# save model weights
model.save_weights("./outputs/model/model.h5")
print("model saved in ./outputs/model folder")

ML 开发人员应熟悉这些代码的大部分内容:

  • 数据已分区为用于训练的训练集和验证集,以及用于最终评分的单独测试子集
  • 输入形状为 28x28x1(仅为 1,因为输入是灰度),一个批中将包含 256 个输入,共有 10 个类
  • 训练循环数为 10
  • 该模型有三个卷积层,包括最大池化和随机失活,后跟全连接层和 softmax 头
  • 该模型适合 10 个循环,然后进行评估
  • 模型体系结构写入 outputs/model/model.json,权重写入 outputs/model/model.h5

不过,某些代码特定于 Azure 机器学习。 run = Run.get_context() 检索包含当前服务上下文的 Run 对象。 train.py 源使用此 run 对象通过其名称检索输入数据集(替代 prepare.py 中通过脚本参数数组 argv 检索数据集的代码)。

run 对象还用于在每个循环结束时记录训练进度,并在训练结束时记录损失和准确度随时间变化的图表。

创建训练管道步骤

训练步骤的配置比准备步骤稍微复杂一些。 准备步骤仅使用标准 Python 库。 更常见的是,需要修改运行源代码的运行时环境。

创建具有以下内容的文件 conda_dependencies.yml

dependencies:
- python=3.7
- pip:
  - azureml-core
  - azureml-dataset-runtime
  - keras==2.4.3
  - tensorflow==2.4.3
  - numpy
  - scikit-learn
  - pandas
  - matplotlib

Environment 类表示运行机器学习任务的运行时环境。 将上述规范与训练代码相关联:

keras_env = Environment.from_conda_specification(
    name="keras-env", file_path="./conda_dependencies.yml"
)

train_cfg = ScriptRunConfig(
    source_directory=script_folder,
    script="train.py",
    compute_target=compute_target,
    environment=keras_env,
)

创建训练步骤本身使用的代码类似于用于创建准备步骤的代码:

train_step = PythonScriptStep(
    name="train step",
    arguments=[
        prepared_fashion_ds.read_delimited_files().as_input(name="prepared_fashion_ds")
    ],
    source_directory=train_cfg.source_directory,
    script_name=train_cfg.script,
    runconfig=train_cfg.run_config,
)

创建并运行管道

现在,你已指定数据输入和输出并创建了管道的步骤,可以将它们组合到管道中并运行管道:

pipeline = Pipeline(workspace, steps=[prep_step, train_step])
run = exp.submit(pipeline)

你创建的 Pipeline 对象在 workspace 中运行,由指定的准备和训练步骤组成。

备注

此管道有一个简单的依赖项关系图:训练步骤依赖于准备步骤,准备步骤依赖于 fashion_ds 数据集。 生产管道通常具有更复杂的依赖项。 步骤可能依赖于多个上游步骤,早期步骤中的源代码更改可能会产生深远的影响,等等。 Azure 机器学习会为你跟踪这些问题。 你只需传入 steps 数组,Azure 机器学习会负责计算执行图。

submitExperiment 的调用很快完成,并生成类似于以下内容的输出:

Submitted PipelineRun 5968530a-abcd-1234-9cc1-46168951b5eb
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/abc-xyz...

可以通过打开链接来监视管道运行,也可以通过运行以下代码来阻止管道运行,直到管道运行完成:

run.wait_for_completion(show_output=True)

重要

首次管道运行需要大约 15 分钟。 必须下载所有依赖项、创建 Docker 映像,并预配和创建 Python 环境。 再次运行管道所花费的时间会大幅减少,因为会重复使用这些资源,而无需再次创建。 但是,管道的总运行时间取决于脚本的工作负荷,以及每个管道步骤中运行的进程数。

管道完成后,可以检索在训练步骤中记录的指标:

run.find_step_run("train step")[0].get_metrics()

如果对指标感到满意,可以在工作区中注册模型:

run.find_step_run("train step")[0].register_model(
    model_name="keras-model",
    model_path="outputs/model/",
    datasets=[("train test data", fashion_ds)],
)

清理资源

如果你打算运行其他 Azure 机器学习教程,请不要完成本部分。

停止计算实例

如果使用了计算实例,请在不使用 VM 时将其停止,以降低成本。

  1. 在工作区中选择“计算”。

  2. 从列表中选择计算实例的名称。

  3. 选择“停止” 。

  4. 准备好再次使用服务器时,选择“启动” 。

删除所有内容

如果不打算使用已创建的资源,请删除它们,以免产生任何费用:

  1. 在 Azure 门户的左侧菜单中选择“资源组”。
  2. 在资源组列表中,选择创建的资源组。
  3. 选择“删除资源组”。
  4. 输入资源组名称。 然后选择“删除”。

还可保留资源组,但请删除单个工作区。 显示工作区属性,然后选择“删除”。

后续步骤

在本教程中,你使用了以下类型:

  • Workspace 代表你的 Azure 机器学习工作区。 其中包含:
    • 包含管道训练运行结果的 Experiment
    • 延迟加载 Fashion-MNIST 数据存储中保存的数据的 Dataset
    • 表示运行管道步骤的计算机的 ComputeTarget
    • 运行管道步骤的运行时环境 Environment
    • PythonScriptStep 步骤组合成一个整体的 Pipeline
    • 对训练过程满意后注册的 Model

Workspace 对象包含对本教程中未使用的其他资源(笔记本、终结点等)的引用。 有关详细信息,请参阅什么是 Azure 机器学习工作区?

OutputFileDatasetConfig 将运行的输出提升为基于文件的数据集。 有关数据集和处理数据的详细信息,请参阅如何访问数据

有关计算目标和环境的详细信息,请参阅什么是 Azure 机器学习中的计算目标?什么是 Azure 机器学习环境?

ScriptRunConfigComputeTargetEnvironment 与 Python 源文件相关联。 PythonScriptStep 采用该 ScriptRunConfig 并定义其输入和输出,在此管道中这是由 OutputFileDatasetConfig 生成的文件数据集。

有关如何使用机器学习 SDK 生成管道的更多示例,请参阅示例存储库