Create and run machine learning pipelines using components with the Azure Machine Learning SDK v2
APPLIES TO: Python SDK azure-ai-ml v2 (current)
In this article, you learn how to build an Azure Machine Learning pipeline using Python SDK v2 to complete an image classification task containing three steps: prepare data, train an image classification model, and score the model. Machine learning pipelines optimize your workflow with speed, portability, and reuse, so you can focus on machine learning instead of infrastructure and automation.
The example trains a small Keras convolutional neural network to classify images in the Fashion MNIST dataset. The pipeline looks like following.
In this article, you complete the following tasks:
- Prepare input data for the pipeline job
- Create three components to prepare the data, train and score
- Compose a Pipeline from the components
- Get access to workspace with compute
- Submit the pipeline job
- Review the output of the components and the trained neural network
- (Optional) Register the component for further reuse and sharing within workspace
If you don't have an Azure subscription, create a free account before you begin. Try the free or paid version of Azure Machine Learning today.
Prerequisites
Azure Machine Learning workspace - if you don't have one, complete the Create resources tutorial.
A Python environment in which you've installed Azure Machine Learning Python SDK v2 - install instructions - check the getting started section. This environment is for defining and controlling your Azure Machine Learning resources and is separate from the environment used at runtime for training.
Clone examples repository
To run the training examples, first clone the examples repository and change into the
sdk
directory:git clone --depth 1 https://github.com/Azure/azureml-examples cd azureml-examples/sdk
Start an interactive Python session
This article uses the Python SDK for Azure Machine Learning to create and control an Azure Machine Learning pipeline. The article assumes that you'll be running the code snippets interactively in either a Python REPL environment or a Jupyter notebook.
This article is based on the image_classification_keras_minist_convnet.ipynb notebook found in the sdk/python/jobs/pipelines/2e_image_classification_keras_minist_convnet
directory of the Azure Machine Learning Examples repository.
Import required libraries
Import all the Azure Machine Learning required libraries that you'll need for this article:
# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import load_component
Prepare input data for your pipeline job
You need to prepare the input data for this image classification pipeline.
Fashion-MNIST is a dataset of fashion images divided into 10 classes. Each image is a 28x28 grayscale image and there are 60,000 training and 10,000 test images. As an image classification problem, Fashion-MNIST is harder than the classic MNIST handwritten digit database. It's distributed in the same compressed binary form as the original handwritten digit database.
Import all the Azure Machine Learning required libraries that you'll need.
By defining an Input
, you create a reference to the data source location. The data remains in its existing location, so no extra storage cost is incurred.
Create components for building pipeline
The image classification task can be split into three steps: prepare data, train model and score model.
Azure Machine Learning component is a self-contained piece of code that does one step in a machine learning pipeline. In this article, you'll create three components for the image classification task:
- Prepare data for training and test
- Train a neural network for image classification using training data
- Score the model using test data
For each component, you need to prepare the following:
Prepare the Python script containing the execution logic
Define the interface of the component
Add other metadata of the component, including run-time environment, command to run the component, and etc.
The next section will show the create components in two different ways: the first two components using Python function and the third component using YAML definition.
Create the data-preparation component
The first component in this pipeline will convert the compressed data files of fashion_ds
into two csv files, one for training and the other for scoring. You'll use Python function to define this component.
If you're following along with the example in the Azure Machine Learning examples repo, the source files are already available in prep/
folder. This folder contains two files to construct the component: prep_component.py
, which defines the component and conda.yaml
, which defines the run-time environment of the component.
Define component using Python function
By using command_component()
function as a decorator, you can easily define the component's interface, metadata and code to execute from a Python function. Each decorated Python function will be transformed into a single static specification (YAML) that the pipeline service can process.
# Converts MNIST-formatted files at the passed-in input path to training data output path and test data output path
import os
from pathlib import Path
from mldesigner import command_component, Input, Output
@command_component(
name="prep_data",
version="1",
display_name="Prep Data",
description="Convert data to CSV file, and split to training and test data",
environment=dict(
conda_file=Path(__file__).parent / "conda.yaml",
image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
),
)
def prepare_data_component(
input_data: Input(type="uri_folder"),
training_data: Output(type="uri_folder"),
test_data: Output(type="uri_folder"),
):
convert(
os.path.join(input_data, "train-images-idx3-ubyte"),
os.path.join(input_data, "train-labels-idx1-ubyte"),
os.path.join(training_data, "mnist_train.csv"),
60000,
)
convert(
os.path.join(input_data, "t10k-images-idx3-ubyte"),
os.path.join(input_data, "t10k-labels-idx1-ubyte"),
os.path.join(test_data, "mnist_test.csv"),
10000,
)
def convert(imgf, labelf, outf, n):
f = open(imgf, "rb")
l = open(labelf, "rb")
o = open(outf, "w")
f.read(16)
l.read(8)
images = []
for i in range(n):
image = [ord(l.read(1))]
for j in range(28 * 28):
image.append(ord(f.read(1)))
images.append(image)
for image in images:
o.write(",".join(str(pix) for pix in image) + "\n")
f.close()
o.close()
l.close()
The code above define a component with display name Prep Data
using @command_component
decorator:
name
is the unique identifier of the component.version
is the current version of the component. A component can have multiple versions.display_name
is a friendly display name of the component in UI, which isn't unique.description
usually describes what task this component can complete.environment
specifies the run-time environment for this component. The environment of this component specifies a docker image and refers to theconda.yaml
file.The
conda.yaml
file contains all packages used for the component like following:name: imagekeras_prep_conda_env channels: - defaults dependencies: - python=3.7.11 - pip=20.0 - pip: - mldesigner==0.1.0b4
The
prepare_data_component
function defines one input forinput_data
and two outputs fortraining_data
andtest_data
.input_data
is input data path.training_data
andtest_data
are output data paths for training data and test data.This component converts the data from
input_data
into a training data csv totraining_data
and a test data csv totest_data
.
Following is what a component looks like in the studio UI.
- A component is a block in a pipeline graph.
- The
input_data
,training_data
andtest_data
are ports of the component, which connects to other components for data streaming.
Now, you've prepared all source files for the Prep Data
component.
Create the train-model component
In this section, you'll create a component for training the image classification model in the Python function like the Prep Data
component.
The difference is that since the training logic is more complicated, you can put the original training code in a separate Python file.
The source files of this component are under train/
folder in the Azure Machine Learning examples repo. This folder contains three files to construct the component:
train.py
: contains the actual logic to train model.train_component.py
: defines the interface of the component and imports the function intrain.py
.conda.yaml
: defines the run-time environment of the component.
Get a script containing execution logic
The train.py
file contains a normal Python function, which performs the training model logic to train a Keras neural network for image classification. To view the code, see the train.py file on GitHub.
Define component using Python function
After defining the training function successfully, you can use @command_component
in Azure Machine Learning SDK v2 to wrap your function as a component, which can be used in Azure Machine Learning pipelines.
import os
from pathlib import Path
from mldesigner import command_component, Input, Output
@command_component(
name="train_image_classification_keras",
version="1",
display_name="Train Image Classification Keras",
description="train image classification with keras",
environment=dict(
conda_file=Path(__file__).parent / "conda.yaml",
image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
),
)
def keras_train_component(
input_data: Input(type="uri_folder"),
output_model: Output(type="uri_folder"),
epochs=10,
):
# avoid dependency issue, execution logic is in train() func in train.py file
from train import train
train(input_data, output_model, epochs)
The code above define a component with display name Train Image Classification Keras
using @command_component
:
- The
keras_train_component
function defines one inputinput_data
where training data comes from, one inputepochs
specifying epochs during training, and one outputoutput_model
where outputs the model file. The default value ofepochs
is 10. The execution logic of this component is fromtrain()
function intrain.py
above.
The train-model component has a slightly more complex configuration than the prep-data component. The conda.yaml
is like following:
name: imagekeras_train_conda_env
channels:
- defaults
dependencies:
- python=3.8
- pip=20.2
- pip:
- mldesigner==0.1.0b12
- azureml-mlflow==1.50.0
- tensorflow==2.7.0
- numpy==1.21.4
- scikit-learn==1.0.1
- pandas==1.3.4
- matplotlib==3.2.2
- protobuf==3.20.0
Now, you've prepared all source files for the Train Image Classification Keras
component.
Create the score-model component
In this section, other than the previous components, you'll create a component to score the trained model via Yaml specification and script.
If you're following along with the example in the Azure Machine Learning examples repo, the source files are already available in score/
folder. This folder contains three files to construct the component:
score.py
: contains the source code of the component.score.yaml
: defines the interface and other details of the component.conda.yaml
: defines the run-time environment of the component.
Get a script containing execution logic
The score.py
file contains a normal Python function, which performs the training model logic.
from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.utils import to_categorical
from keras.callbacks import Callback
from keras.models import load_model
import argparse
from pathlib import Path
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import mlflow
def get_file(f):
f = Path(f)
if f.is_file():
return f
else:
files = list(f.iterdir())
if len(files) == 1:
return files[0]
else:
raise Exception("********This path contains more than one file*******")
def parse_args():
# setup argparse
parser = argparse.ArgumentParser()
# add arguments
parser.add_argument(
"--input_data", type=str, help="path containing data for scoring"
)
parser.add_argument(
"--input_model", type=str, default="./", help="input path for model"
)
parser.add_argument(
"--output_result", type=str, default="./", help="output path for model"
)
# parse args
args = parser.parse_args()
# return args
return args
def score(input_data, input_model, output_result):
test_file = get_file(input_data)
data_test = pd.read_csv(test_file, header=None)
img_rows, img_cols = 28, 28
input_shape = (img_rows, img_cols, 1)
# Read test data
X_test = np.array(data_test.iloc[:, 1:])
y_test = to_categorical(np.array(data_test.iloc[:, 0]))
X_test = (
X_test.reshape(X_test.shape[0], img_rows, img_cols, 1).astype("float32") / 255
)
# Load model
files = [f for f in os.listdir(input_model) if f.endswith(".h5")]
model = load_model(input_model + "/" + files[0])
# Log metrics of the model
eval = model.evaluate(X_test, y_test, verbose=0)
mlflow.log_metric("Final test loss", eval[0])
print("Test loss:", eval[0])
mlflow.log_metric("Final test accuracy", eval[1])
print("Test accuracy:", eval[1])
# Score model using test data
y_predict = model.predict(X_test)
y_result = np.argmax(y_predict, axis=1)
# Output result
np.savetxt(output_result + "/predict_result.csv", y_result, delimiter=",")
def main(args):
score(args.input_data, args.input_model, args.output_result)
# run script
if __name__ == "__main__":
# parse args
args = parse_args()
# call main function
main(args)
The code in score.py takes three command-line arguments: input_data
, input_model
and output_result
. The program score the input model using input data and then output the scoring result.
Define component via Yaml
In this section, you'll learn to create a component specification in the valid YAML component specification format. This file specifies the following information:
- Metadata: name, display_name, version, type, and so on.
- Interface: inputs and outputs
- Command, code, & environment: The command, code, and environment used to run the component
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
type: command
name: score_image_classification_keras
display_name: Score Image Classification Keras
inputs:
input_data:
type: uri_folder
input_model:
type: uri_folder
outputs:
output_result:
type: uri_folder
code: ./
command: python score.py --input_data ${{inputs.input_data}} --input_model ${{inputs.input_model}} --output_result ${{outputs.output_result}}
environment:
conda_file: ./conda.yaml
image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
name
is the unique identifier of the component. Its display name isScore Image Classification Keras
.- This component has two inputs and one output.
- The source code path of it's defined in the
code
section and when the component is run in cloud, all files from that path will be uploaded as the snapshot of this component. - The
command
section specifies the command to execute while running this component. - The
environment
section contains a docker image and a conda yaml file. The source file is in the sample repository.
Now, you've got all source files for score-model component.
Load components to build pipeline
For prep-data component and train-model component defined by Python function, you can import the components just like normal Python functions.
In the following code, you import prepare_data_component()
and keras_train_component()
function from the prep_component.py
file under prep
folder and train_component
file under train
folder respectively.
%load_ext autoreload
%autoreload 2
# load component function from component python file
from prep.prep_component import prepare_data_component
from train.train_component import keras_train_component
# print hint of components
help(prepare_data_component)
help(keras_train_component)
For score component defined by yaml, you can use load_component()
function to load.
# load component function from yaml
keras_score_component = load_component(source="./score/score.yaml")
Build your pipeline
Now that you've created and loaded all components and input data to build the pipeline. You can compose them into a pipeline:
Note
To use serverless compute, add from azure.ai.ml.entities import ResourceConfiguration
to the top.
Then replace:
default_compute=cpu_compute_target,
withdefault_compute="serverless",
train_node.compute = gpu_compute_target
withtrain_node.resources = "ResourceConfiguration(instance_type="Standard_NC6s_v3",instance_count=2)
# define a pipeline containing 3 nodes: Prepare data node, train node, and score node
@pipeline(
default_compute=cpu_compute_target,
)
def image_classification_keras_minist_convnet(pipeline_input_data):
"""E2E image classification pipeline with keras using python sdk."""
prepare_data_node = prepare_data_component(input_data=pipeline_input_data)
train_node = keras_train_component(
input_data=prepare_data_node.outputs.training_data
)
train_node.compute = gpu_compute_target
score_node = keras_score_component(
input_data=prepare_data_node.outputs.test_data,
input_model=train_node.outputs.output_model,
)
# create a pipeline
pipeline_job = image_classification_keras_minist_convnet(pipeline_input_data=mnist_ds)
The pipeline has a default compute cpu_compute_target
, which means if you don't specify compute for a specific node, that node will run on the default compute.
The pipeline has a pipeline level input pipeline_input_data
. You can assign value to pipeline input when you submit a pipeline job.
The pipeline contains three nodes, prepare_data_node, train_node and score_node.
The
input_data
ofprepare_data_node
uses the value ofpipeline_input_data
.The
input_data
oftrain_node
is from thetraining_data
output of the prepare_data_node.The
input_data
of score_node is from thetest_data
output of prepare_data_node, and theinput_model
is from theoutput_model
of train_node.Since
train_node
will train a CNN model, you can specify its compute as the gpu_compute_target, which can improve the training performance.
Submit your pipeline job
Now you've constructed the pipeline, you can submit to your workspace. To submit a job, you need to firstly connect to a workspace.
Get access to your workspace
Configure credential
We'll use DefaultAzureCredential
to get access to the workspace. DefaultAzureCredential
should be capable of handling most Azure SDK authentication scenarios.
Reference for more available credentials if it doesn't work for you: configure credential example, azure-identity reference doc.
try:
credential = DefaultAzureCredential()
# Check if given credential can get token successfully.
credential.get_token("https://management.azure.com/.default")
except Exception as ex:
# Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
credential = InteractiveBrowserCredential()
Get a handle to a workspace with compute
Create a MLClient
object to manage Azure Machine Learning services. If you use serverless compute then there's no need to create these computes.
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)
# Retrieve an already attached Azure Machine Learning Compute.
cpu_compute_target = "cpu-cluster"
print(ml_client.compute.get(cpu_compute_target))
gpu_compute_target = "gpu-cluster"
print(ml_client.compute.get(gpu_compute_target))
Important
This code snippet expects the workspace configuration json file to be saved in the current directory or its parent. For more information on creating a workspace, see Create workspace resources. For more information on saving the configuration to file, see Create a workspace configuration file.
Submit pipeline job to workspace
Now you've get a handle to your workspace, you can submit your pipeline job.
pipeline_job = ml_client.jobs.create_or_update(
pipeline_job, experiment_name="pipeline_samples"
)
pipeline_job
The code above submit this image classification pipeline job to experiment called pipeline_samples
. It will auto create the experiment if not exists. The pipeline_input_data
uses fashion_ds
.
The call to pipeline_job
produces output similar to:
The call to submit
the Experiment
completes quickly, and produces output similar to:
Experiment | Name | Type | Status | Details Page |
---|---|---|---|---|
pipeline_samples | sharp_pipe_4gvqx6h1fb | pipeline | Preparing | Link to Azure Machine Learning studio. |
You can monitor the pipeline run by opening the link or you can block until it completes by running:
# wait until the job completes
ml_client.jobs.stream(pipeline_job.name)
Important
The first pipeline run takes roughly 15 minutes. All dependencies must be downloaded, a Docker image is created, and the Python environment is provisioned and created. Running the pipeline again takes significantly less time because those resources are reused instead of created. However, total run time for the pipeline depends on the workload of your scripts and the processes that are running in each pipeline step.
Checkout outputs and debug your pipeline in UI
You can open the Link to Azure Machine Learning studio
, which is the job detail page of your pipeline. You'll see the pipeline graph like following.
You can check the logs and outputs of each component by right clicking the component, or select the component to open its detail pane. To learn more about how to debug your pipeline in UI, see How to use debug pipeline failure.
(Optional) Register components to workspace
In the previous section, you have built a pipeline using three components to E2E complete an image classification task. You can also register components to your workspace so that they can be shared and resued within the workspace. Following is an example to register prep-data component.
try:
# try get back the component
prep = ml_client.components.get(name="prep_data", version="1")
except:
# if not exists, register component using following code
prep = ml_client.components.create_or_update(prepare_data_component)
# list all components registered in workspace
for c in ml_client.components.list():
print(c)
Using ml_client.components.get()
, you can get a registered component by name and version. Using ml_client.components.create_or_update()
, you can register a component previously loaded from Python function or yaml.
Next steps
- For more examples of how to build pipelines by using the machine learning SDK, see the example repository.
- For how to use studio UI to submit and debug your pipeline, refer to how to create pipelines using component in the UI.
- For how to use Azure Machine Learning CLI to create components and pipelines, refer to how to create pipelines using component with CLI.
- For how to deploy pipelines into production using Batch Endpoints, see how to deploy pipelines with batch endpoints.