Tutorial: Build an Azure Machine Learning pipeline for image classification
APPLIES TO: Python SDK azureml v1
Note
For a tutorial that uses SDK v2 to build a pipeline, see Tutorial: Use ML pipelines for production ML workflows with Python SDK v2 in a Jupyter Notebook.
In this tutorial, you learn how to build an Azure Machine Learning pipeline to prepare data and train a machine learning model. Machine learning pipelines optimize your workflow with speed, portability, and reuse, so you can focus on machine learning instead of infrastructure and automation.
The example trains a small Keras convolutional neural network to classify images in the Fashion MNIST dataset.
In this tutorial, you complete the following tasks:
- Configure workspace
- Create an Experiment to hold your work
- Provision a ComputeTarget to do the work
- Create a Dataset in which to store compressed data
- Create a pipeline step to prepare the data for training
- Define a runtime Environment in which to perform training
- Create a pipeline step to define the neural network and perform the training
- Compose a Pipeline from the pipeline steps
- Run the pipeline in the experiment
- Review the output of the steps and the trained neural network
- Register the model for further use
If you don't have an Azure subscription, create a free account before you begin. Try the free or paid version of Azure Machine Learning today.
Prerequisites
- Complete Create resources to get started if you don't already have an Azure Machine Learning workspace.
- A Python environment in which you've installed both the
azureml-core
andazureml-pipeline
packages. This environment is for defining and controlling your Azure Machine Learning resources and is separate from the environment used at runtime for training.
Important
Currently, the most recent Python release compatible with azureml-pipeline
is Python 3.8. If you've difficulty installing the azureml-pipeline
package, ensure that python --version
is a compatible release. Consult the documentation of your Python virtual environment manager (venv
, conda
, and so on) for instructions.
Start an interactive Python session
This tutorial uses the Python SDK for Azure Machine Learning to create and control an Azure Machine Learning pipeline. The tutorial assumes that you'll be running the code snippets interactively in either a Python REPL environment or a Jupyter notebook.
- This tutorial is based on the
image-classification.ipynb
notebook found in thepython-sdk/tutorial/using-pipelines
directory of the Azure Machine Learning Examples repository. The source code for the steps themselves is in thekeras-mnist-fashion
subdirectory.
Import types
Import all the Azure Machine Learning types that you'll need for this tutorial:
import os
import azureml.core
from azureml.core import (
Workspace,
Experiment,
Dataset,
Datastore,
ComputeTarget,
Environment,
ScriptRunConfig
)
from azureml.data import OutputFileDatasetConfig
from azureml.core.compute import AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline
# check core SDK version number
print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)
The Azure Machine Learning SDK version should be 1.37 or greater. If it isn't, upgrade with pip install --upgrade azureml-core
.
Configure workspace
Create a workspace object from the existing Azure Machine Learning workspace.
workspace = Workspace.from_config()
Important
This code snippet expects the workspace configuration to be saved in the current directory or its parent. For more information on creating a workspace, see Create workspace resources. For more information on saving the configuration to file, see Create a workspace configuration file.
Create the infrastructure for your pipeline
Create an Experiment
object to hold the results of your pipeline runs:
exp = Experiment(workspace=workspace, name="keras-mnist-fashion")
Create a ComputeTarget
that represents the machine resource on which your pipeline will run. The simple neural network used in this tutorial trains in just a few minutes even on a CPU-based machine. If you wish to use a GPU for training, set use_gpu
to True
. Provisioning a compute target generally takes about five minutes.
use_gpu = False
# choose a name for your cluster
cluster_name = "gpu-cluster" if use_gpu else "cpu-cluster"
found = False
# Check if this compute target already exists in the workspace.
cts = workspace.compute_targets
if cluster_name in cts and cts[cluster_name].type == "AmlCompute":
found = True
print("Found existing compute target.")
compute_target = cts[cluster_name]
if not found:
print("Creating a new compute target...")
compute_config = AmlCompute.provisioning_configuration(
vm_size= "STANDARD_NC6" if use_gpu else "STANDARD_D2_V2"
# vm_priority = 'lowpriority', # optional
max_nodes=4,
)
# Create the cluster.
compute_target = ComputeTarget.create(workspace, cluster_name, compute_config)
# Can poll for a minimum number of nodes and for a specific timeout.
# If no min_node_count is provided, it will use the scale settings for the cluster.
compute_target.wait_for_completion(
show_output=True, min_node_count=None, timeout_in_minutes=10
)
# For a more detailed view of current AmlCompute status, use get_status().print(compute_target.get_status().serialize())
Note
GPU availability depends on the quota of your Azure subscription and upon Azure capacity. See Manage and increase quotas for resources with Azure Machine Learning.
Create a dataset for the Azure-stored data
Fashion-MNIST is a dataset of fashion images divided into 10 classes. Each image is a 28x28 grayscale image and there are 60,000 training and 10,000 test images. As an image classification problem, Fashion-MNIST is harder than the classic MNIST handwritten digit database. It's distributed in the same compressed binary form as the original handwritten digit database .
To create a Dataset
that references the Web-based data, run:
data_urls = ["https://data4mldemo6150520719.blob.core.windows.net/demo/mnist-fashion"]
fashion_ds = Dataset.File.from_files(data_urls)
# list the files referenced by fashion_ds
print(fashion_ds.to_path())
This code completes quickly. The underlying data remains in the Azure storage resource specified in the data_urls
array.
Create the data-preparation pipeline step
The first step in this pipeline will convert the compressed data files of fashion_ds
into a dataset in your own workspace consisting of CSV files ready for use in training. Once registered with the workspace, your collaborators can access this data for their own analysis, training, and so on
datastore = workspace.get_default_datastore()
prepared_fashion_ds = OutputFileDatasetConfig(
destination=(datastore, "outputdataset/{run-id}")
).register_on_complete(name="prepared_fashion_ds")
The above code specifies a dataset that is based on the output of a pipeline step. The underlying processed files will be put in the workspace's default datastore's blob storage at the path specified in destination
. The dataset will be registered in the workspace with the name prepared_fashion_ds
.
Create the pipeline step's source
The code that you've executed so far has create and controlled Azure resources. Now it's time to write code that does the first step in the domain.
If you're following along with the example in the Azure Machine Learning Examples repo, the source file is already available as keras-mnist-fashion/prepare.py
.
If you're working from scratch, create a subdirectory called keras-mnist-fashion/
. Create a new file, add the following code to it, and name the file prepare.py
.
# prepare.py
# Converts MNIST-formatted files at the passed-in input path to a passed-in output path
import os
import sys
# Conversion routine for MNIST binary format
def convert(imgf, labelf, outf, n):
f = open(imgf, "rb")
l = open(labelf, "rb")
o = open(outf, "w")
f.read(16)
l.read(8)
images = []
for i in range(n):
image = [ord(l.read(1))]
for j in range(28 * 28):
image.append(ord(f.read(1)))
images.append(image)
for image in images:
o.write(",".join(str(pix) for pix in image) + "\n")
f.close()
o.close()
l.close()
# The MNIST-formatted source
mounted_input_path = sys.argv[1]
# The output directory at which the outputs will be written
mounted_output_path = sys.argv[2]
# Create the output directory
os.makedirs(mounted_output_path, exist_ok=True)
# Convert the training data
convert(
os.path.join(mounted_input_path, "mnist-fashion/train-images-idx3-ubyte"),
os.path.join(mounted_input_path, "mnist-fashion/train-labels-idx1-ubyte"),
os.path.join(mounted_output_path, "mnist_train.csv"),
60000,
)
# Convert the test data
convert(
os.path.join(mounted_input_path, "mnist-fashion/t10k-images-idx3-ubyte"),
os.path.join(mounted_input_path, "mnist-fashion/t10k-labels-idx1-ubyte"),
os.path.join(mounted_output_path, "mnist_test.csv"),
10000,
)
The code in prepare.py
takes two command-line arguments: the first is assigned to mounted_input_path
and the second to mounted_output_path
. If that subdirectory doesn't exist, the call to os.makedirs
creates it. Then, the program converts the training and testing data and outputs the comma-separated files to the mounted_output_path
.
Specify the pipeline step
Back in the Python environment you're using to specify the pipeline, run this code to create a PythonScriptStep
for your preparation code:
script_folder = "./keras-mnist-fashion"
prep_step = PythonScriptStep(
name="prepare step",
script_name="prepare.py",
# On the compute target, mount fashion_ds dataset as input, prepared_fashion_ds as output
arguments=[fashion_ds.as_named_input("fashion_ds").as_mount(), prepared_fashion_ds],
source_directory=script_folder,
compute_target=compute_target,
allow_reuse=True,
)
The call to PythonScriptStep
specifies that, when the pipeline step is run:
- All the files in the
script_folder
directory are uploaded to thecompute_target
- Among those uploaded source files, the file
prepare.py
will be run - The
fashion_ds
andprepared_fashion_ds
datasets will be mounted on thecompute_target
and appear as directories - The path to the
fashion_ds
files will be the first argument toprepare.py
. Inprepare.py
, this argument is assigned tomounted_input_path
- The path to the
prepared_fashion_ds
will be the second argument toprepare.py
. Inprepare.py
, this argument is assigned tomounted_output_path
- Because
allow_reuse
isTrue
, it won't be rerun until its source files or inputs change - This
PythonScriptStep
will be namedprepare step
Modularity and reuse are key benefits of pipelines. Azure Machine Learning can automatically determine source code or Dataset changes. The output of a step that isn't affected will be reused without rerunning the steps again if allow_reuse
is True
. If a step relies on a data source external to Azure Machine Learning that may change (for instance, a URL that contains sales data), set allow_reuse
to False
and the pipeline step will run every time the pipeline is run.
Create the training step
Once the data has been converted from the compressed format to CSV files, it can be used for training a convolutional neural network.
Create the training step's source
With larger pipelines, it's a good practice to put each step's source code in a separate directory (src/prepare/
, src/train/
, and so on) but for this tutorial, just use or create the file train.py
in the same keras-mnist-fashion/
source directory.
import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.layers.normalization import BatchNormalization
from keras.utils import to_categorical
from keras.callbacks import Callback
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from azureml.core import Run
# dataset object from the run
run = Run.get_context()
dataset = run.input_datasets["prepared_fashion_ds"]
# split dataset into train and test set
(train_dataset, test_dataset) = dataset.random_split(percentage=0.8, seed=111)
# load dataset into pandas dataframe
data_train = train_dataset.to_pandas_dataframe()
data_test = test_dataset.to_pandas_dataframe()
img_rows, img_cols = 28, 28
input_shape = (img_rows, img_cols, 1)
X = np.array(data_train.iloc[:, 1:])
y = to_categorical(np.array(data_train.iloc[:, 0]))
# here we split validation data to optimiza classifier during training
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=13)
# test data
X_test = np.array(data_test.iloc[:, 1:])
y_test = to_categorical(np.array(data_test.iloc[:, 0]))
X_train = (
X_train.reshape(X_train.shape[0], img_rows, img_cols, 1).astype("float32") / 255
)
X_test = X_test.reshape(X_test.shape[0], img_rows, img_cols, 1).astype("float32") / 255
X_val = X_val.reshape(X_val.shape[0], img_rows, img_cols, 1).astype("float32") / 255
batch_size = 256
num_classes = 10
epochs = 10
# construct neuron network
model = Sequential()
model.add(
Conv2D(
32,
kernel_size=(3, 3),
activation="relu",
kernel_initializer="he_normal",
input_shape=input_shape,
)
)
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.25))
model.add(Conv2D(64, (3, 3), activation="relu"))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Conv2D(128, (3, 3), activation="relu"))
model.add(Dropout(0.4))
model.add(Flatten())
model.add(Dense(128, activation="relu"))
model.add(Dropout(0.3))
model.add(Dense(num_classes, activation="softmax"))
model.compile(
loss=keras.losses.categorical_crossentropy,
optimizer=keras.optimizers.Adam(),
metrics=["accuracy"],
)
# start an Azure ML run
run = Run.get_context()
class LogRunMetrics(Callback):
# callback at the end of every epoch
def on_epoch_end(self, epoch, log):
# log a value repeated which creates a list
run.log("Loss", log["loss"])
run.log("Accuracy", log["accuracy"])
history = model.fit(
X_train,
y_train,
batch_size=batch_size,
epochs=epochs,
verbose=1,
validation_data=(X_val, y_val),
callbacks=[LogRunMetrics()],
)
score = model.evaluate(X_test, y_test, verbose=0)
# log a single value
run.log("Final test loss", score[0])
print("Test loss:", score[0])
run.log("Final test accuracy", score[1])
print("Test accuracy:", score[1])
plt.figure(figsize=(6, 3))
plt.title("Fashion MNIST with Keras ({} epochs)".format(epochs), fontsize=14)
plt.plot(history.history["accuracy"], "b-", label="Accuracy", lw=4, alpha=0.5)
plt.plot(history.history["loss"], "r--", label="Loss", lw=4, alpha=0.5)
plt.legend(fontsize=12)
plt.grid(True)
# log an image
run.log_image("Loss v.s. Accuracy", plot=plt)
# create a ./outputs/model folder in the compute target
# files saved in the "./outputs" folder are automatically uploaded into run history
os.makedirs("./outputs/model", exist_ok=True)
# serialize NN architecture to JSON
model_json = model.to_json()
# save model JSON
with open("./outputs/model/model.json", "w") as f:
f.write(model_json)
# save model weights
model.save_weights("./outputs/model/model.h5")
print("model saved in ./outputs/model folder")
Most of this code should be familiar to ML developers:
- The data is partitioned into train and validation sets for training, and a separate test subset for final scoring
- The input shape is 28x28x1 (only 1 because the input is grayscale), there will be 256 inputs in a batch, and there are 10 classes
- The number of training epochs will be 10
- The model has three convolutional layers, with max pooling and dropout, followed by a dense layer and softmax head
- The model is fitted for 10 epochs and then evaluated
- The model architecture is written to
outputs/model/model.json
and the weights tooutputs/model/model.h5
Some of the code, though, is specific to Azure Machine Learning. run = Run.get_context()
retrieves a Run
object, which contains the current service context. The train.py
source uses this run
object to retrieve the input dataset via its name (an alternative to the code in prepare.py
that retrieved the dataset via the argv
array of script arguments).
The run
object is also used to log the training progress at the end of every epoch and, at the end of training, to log the graph of loss and accuracy over time.
Create the training pipeline step
The training step has a slightly more complex configuration than the preparation step. The preparation step used only standard Python libraries. More commonly, you'll need to modify the runtime environment in which your source code runs.
Create a file conda_dependencies.yml
with the following contents:
dependencies:
- python=3.7
- pip:
- azureml-core
- azureml-dataset-runtime
- keras==2.4.3
- tensorflow==2.4.3
- numpy
- scikit-learn
- pandas
- matplotlib
The Environment
class represents the runtime environment in which a machine learning task runs. Associate the above specification with the training code with:
keras_env = Environment.from_conda_specification(
name="keras-env", file_path="./conda_dependencies.yml"
)
train_cfg = ScriptRunConfig(
source_directory=script_folder,
script="train.py",
compute_target=compute_target,
environment=keras_env,
)
Creating the training step itself uses code similar to the code used to create the preparation step:
train_step = PythonScriptStep(
name="train step",
arguments=[
prepared_fashion_ds.read_delimited_files().as_input(name="prepared_fashion_ds")
],
source_directory=train_cfg.source_directory,
script_name=train_cfg.script,
runconfig=train_cfg.run_config,
)
Create and run the pipeline
Now that you've specified data inputs and outputs and created your pipeline's steps, you can compose them into a pipeline and run it:
pipeline = Pipeline(workspace, steps=[prep_step, train_step])
run = exp.submit(pipeline)
The Pipeline
object you create runs in your workspace
and is composed of the preparation and training steps you've specified.
Note
This pipeline has a simple dependency graph: the training step relies on the preparation step and the preparation step relies on the fashion_ds
dataset. Production pipelines will often have much more complex dependencies. Steps may rely on multiple upstream steps, a source code change in an early step may have far-reaching consequences, and so on. Azure Machine Learning tracks these concerns for you. You need only pass in the array of steps
and Azure Machine Learning takes care of calculating the execution graph.
The call to submit
the Experiment
completes quickly, and produces output similar to:
Submitted PipelineRun 5968530a-abcd-1234-9cc1-46168951b5eb
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/abc-xyz...
You can monitor the pipeline run by opening the link or you can block until it completes by running:
run.wait_for_completion(show_output=True)
Important
The first pipeline run takes roughly 15 minutes. All dependencies must be downloaded, a Docker image is created, and the Python environment is provisioned and created. Running the pipeline again takes significantly less time because those resources are reused instead of created. However, total run time for the pipeline depends on the workload of your scripts and the processes that are running in each pipeline step.
Once the pipeline completes, you can retrieve the metrics you logged in the training step:
run.find_step_run("train step")[0].get_metrics()
If you're satisfied with the metrics, you can register the model in your workspace:
run.find_step_run("train step")[0].register_model(
model_name="keras-model",
model_path="outputs/model/",
datasets=[("train test data", fashion_ds)],
)
Clean up resources
Don't complete this section if you plan to run other Azure Machine Learning tutorials.
Stop the compute instance
If you used a compute instance, stop the VM when you aren't using it to reduce cost.
In your workspace, select Compute.
From the list, select the name of the compute instance.
Select Stop.
When you're ready to use the server again, select Start.
Delete everything
If you don't plan to use the resources you created, delete them, so you don't incur any charges:
- In the Azure portal, in the left menu, select Resource groups.
- In the list of resource groups, select the resource group you created.
- Select Delete resource group.
- Enter the resource group name. Then, select Delete.
You can also keep the resource group but delete a single workspace. Display the workspace properties, and then select Delete.
Next steps
In this tutorial, you used the following types:
- The
Workspace
represents your Azure Machine Learning workspace. It contained:- The
Experiment
that contains the results of training runs of your pipeline - The
Dataset
that lazily loaded the data held in the Fashion-MNIST datastore - The
ComputeTarget
that represents the machine(s) on which the pipeline steps run - The
Environment
that is the runtime environment in which the pipeline steps run - The
Pipeline
that composes thePythonScriptStep
steps into a whole - The
Model
that you registered after being satisfied with the training process
- The
The Workspace
object contains references to other resources (notebooks, endpoints, and so on) that weren't used in this tutorial. For more, see What is an Azure Machine Learning workspace?.
The OutputFileDatasetConfig
promotes the output of a run to a file-based dataset. For more information on datasets and working with data, see How to access data.
For more on compute targets and environments, see What are compute targets in Azure Machine Learning? and What are Azure Machine Learning environments?
The ScriptRunConfig
associates a ComputeTarget
and Environment
with Python source files. A PythonScriptStep
takes that ScriptRunConfig
and defines its inputs and outputs, which in this pipeline was the file dataset built by the OutputFileDatasetConfig
.
For more examples of how to build pipelines by using the machine learning SDK, see the example repository.