チュートリアル: 画像分類用の Azure Machine Learning パイプラインを構築する

適用対象:Python SDK azureml v1

注意

SDK v2 を使用してパイプラインを構築するチュートリアルについては、チュートリアル: Jupyter Notebook での Python SDK v2 を使用した運用 ML ワークフローの ML パイプラインの使用に関するページを参照してください。

このチュートリアルでは、Azure Machine Learning パイプラインを構築してデータを準備し、機械学習モデルをトレーニングする方法について説明します。 機械学習パイプラインは、速度、移植性、再利用によってワークフローを最適化します。これにより、ユーザーはインフラストラクチャや自動化にではなく、機械学習に専念することができます。

この例では、Fashion MNIST データセットの画像を分類するために、小規模な Keras の畳み込みニューラル ネットワークをトレーニングします。

このチュートリアルでは、次のタスクを実行します。

  • ワークスペースの構成
  • 作業を保持するための実験を作成する
  • 作業を行うためのコンピューティング先をプロビジョニングする
  • 圧縮データを格納するためのデータセットを作成する
  • トレーニング用のデータを準備するためのパイプライン ステップを作成する
  • トレーニングを実行するランタイム環境を定義する
  • ニューラル ネットワークを定義し、トレーニングを実行するためのパイプライン ステップを作成する
  • パイプライン ステップからパイプラインを構成する
  • 実験でパイプラインを実行する
  • ステップとトレーニングされたニューラル ネットワークの出力を確認する
  • 今後も使用するためにモデルを登録する

Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。 無料版または有料版の Azure Machine Learning を今すぐお試しください。

前提条件

  • Azure Machine Learning ワークスペースがまだない場合は、「作業を開始するために必要なリソースを作成する」の手順を完了してください。
  • azureml-coreazureml-pipeline の両方のパッケージをインストールした Python 環境。 この環境は、Azure Machine Learning リソースを定義および制御するためのものであり、トレーニングのためにランタイムに使用される環境とは分離されています。

重要

現在、azureml-pipeline と互換性のある最新の Python リリースは Python 3.8 です。 azureml-pipeline パッケージのインストールが困難な場合は、python --version が互換性のあるリリースであることを確認してください。 手順については、Python 仮想環境マネージャー (venvconda など) のドキュメントを参照してください。

対話型の Python セッションを開始する

このチュートリアルでは、Azure Machine Learning 用の Python SDK を使用して、Azure Machine Learning パイプラインを作成および制御します。 このチュートリアルでは、Python REPL 環境または Jupyter ノートブックでコード スニペットを対話的に実行することを前提としています。

  • このチュートリアルは、Azure Machine Learning の例 のリポジトリの python-sdk/tutorial/using-pipelines ディレクトリにある image-classification.ipynb ノートブックに基づいています。 ステップ自体のソース コードは、keras-mnist-fashion サブディレクトリにあります。

インポートの種類

このチュートリアルに必要なすべての種類の Azure Machine Learning をインポートします。

import os
import azureml.core
from azureml.core import (
    Workspace,
    Experiment,
    Dataset,
    Datastore,
    ComputeTarget,
    Environment,
    ScriptRunConfig
)
from azureml.data import OutputFileDatasetConfig
from azureml.core.compute import AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline

# check core SDK version number
print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)

Azure Machine Learning SDK のバージョンは 1.37 以上である必要があります。 そうでない場合は、pip install --upgrade azureml-core を使用してアップグレードします。

ワークスペースの構成

既存の Azure Machine Learning ワークスペースからワークスペース オブジェクトを作成します。

workspace = Workspace.from_config()

重要

このコード スニペットでは、ワークスペースの構成が現在のディレクトリまたはその親に保存されていることを想定しています。 ワークスペースの作成方法について詳しくは、ワークスペース リソースの作成に関するページを参照してください。 構成をファイルに保存する方法について詳しくは、「ワークスペース構成ファイルを作成する」を参照してください。

パイプラインのインフラストラクチャを作成する

パイプラインの実行結果を保持する Experiment オブジェクトを作成します。

exp = Experiment(workspace=workspace, name="keras-mnist-fashion")

パイプラインを実行するマシン リソースを表す ComputeTarget を作成します。 このチュートリアルで使用している単純なニューラル ネットワークは、CPU ベースのマシンでもほんの数分でトレーニングを行います。 トレーニングに GPU を使用する場合は、use_gpuTrue に設定します。 コンピューティング先のプロビジョニングには、通常、約 5 分かかります。

use_gpu = False

# choose a name for your cluster
cluster_name = "gpu-cluster" if use_gpu else "cpu-cluster"

found = False
# Check if this compute target already exists in the workspace.
cts = workspace.compute_targets
if cluster_name in cts and cts[cluster_name].type == "AmlCompute":
    found = True
    print("Found existing compute target.")
    compute_target = cts[cluster_name]
if not found:
    print("Creating a new compute target...")
    compute_config = AmlCompute.provisioning_configuration(
        vm_size= "STANDARD_NC6" if use_gpu else "STANDARD_D2_V2"
        # vm_priority = 'lowpriority', # optional
        max_nodes=4,
    )

    # Create the cluster.
    compute_target = ComputeTarget.create(workspace, cluster_name, compute_config)

    # Can poll for a minimum number of nodes and for a specific timeout.
    # If no min_node_count is provided, it will use the scale settings for the cluster.
    compute_target.wait_for_completion(
        show_output=True, min_node_count=None, timeout_in_minutes=10
    )
# For a more detailed view of current AmlCompute status, use get_status().print(compute_target.get_status().serialize())

注意

GPU の可用性は、Azure サブスクリプションのクォータと Azure の容量によって異なります。 「Azure Machine Learning を使用するリソースのクォータの管理と引き上げ」を参照してください。

Azure に格納されたデータのデータセットを作成する

Fashion-MNIST は、10 のクラスに分割されたファッションの画像のデータセットです。 各画像は 28x28 のグレースケール画像であり、6 万のトレーニングと 1 万のテストの画像があります。 Fashion-MNIST には従来の MNIST の手書き数字データベースよりも画像分類が難しいという問題があります。 これは、元の手書き数字データベースと同じ圧縮されたバイナリ形式で配布されます。

Web ベースのデータを参照する Dataset を作成するには、次を実行します。

data_urls = ["https://data4mldemo6150520719.blob.core.windows.net/demo/mnist-fashion"]
fashion_ds = Dataset.File.from_files(data_urls)

# list the files referenced by fashion_ds
print(fashion_ds.to_path())

このコードはすぐに完了します。 基になるデータは、data_urls 配列で指定された Azure ストレージ リソースに残ります。

データ準備パイプライン ステップを作成する

このパイプラインの最初のステップでは、fashion_ds の圧縮データ ファイルを、トレーニングで使用できる CSV ファイルで構成される独自のワークスペース内のデータセットに変換します。 ワークスペースに登録すると、コラボレーターはこのデータにアクセスして、独自の分析やトレーニングなどを行うことができます

datastore = workspace.get_default_datastore()
prepared_fashion_ds = OutputFileDatasetConfig(
    destination=(datastore, "outputdataset/{run-id}")
).register_on_complete(name="prepared_fashion_ds")

上のコードでは、パイプライン ステップの出力に基づいたデータセットが指定されます。 基になる処理済みファイルは、destination で指定したパスにあるワークスペースの既定のデータストアの BLOB ストレージに格納されます。 データセットは、prepared_fashion_ds という名前のワークスペースに登録されます。

パイプラインのステップのソースを作成する

これまでに実行したコードでは、Azure リソースの作成と制御が行われています。 ここでは、ドメインの最初のステップを実行するコードを記述します。

Azure Machine Learning の例のリポジトリの例に従っている場合、ソース ファイルは既に keras-mnist-fashion/prepare.py として使用できます。

最初から作成している場合は、keras-mnist-fashion/ という名前のサブディレクトリを作成します。 新しいファイルを作成し、次のコードを追加して、ファイルに prepare.py という名前を指定します。

# prepare.py
# Converts MNIST-formatted files at the passed-in input path to a passed-in output path
import os
import sys

# Conversion routine for MNIST binary format
def convert(imgf, labelf, outf, n):
    f = open(imgf, "rb")
    l = open(labelf, "rb")
    o = open(outf, "w")

    f.read(16)
    l.read(8)
    images = []

    for i in range(n):
        image = [ord(l.read(1))]
        for j in range(28 * 28):
            image.append(ord(f.read(1)))
        images.append(image)

    for image in images:
        o.write(",".join(str(pix) for pix in image) + "\n")
    f.close()
    o.close()
    l.close()

# The MNIST-formatted source
mounted_input_path = sys.argv[1]
# The output directory at which the outputs will be written
mounted_output_path = sys.argv[2]

# Create the output directory
os.makedirs(mounted_output_path, exist_ok=True)

# Convert the training data
convert(
    os.path.join(mounted_input_path, "mnist-fashion/train-images-idx3-ubyte"),
    os.path.join(mounted_input_path, "mnist-fashion/train-labels-idx1-ubyte"),
    os.path.join(mounted_output_path, "mnist_train.csv"),
    60000,
)

# Convert the test data
convert(
    os.path.join(mounted_input_path, "mnist-fashion/t10k-images-idx3-ubyte"),
    os.path.join(mounted_input_path, "mnist-fashion/t10k-labels-idx1-ubyte"),
    os.path.join(mounted_output_path, "mnist_test.csv"),
    10000,
)

prepare.py のコードは 2 つのコマンドライン引数を取ります。1 つ目は mounted_input_path に、2 つ目は mounted_output_path に代入されます。 そのサブディレクトリが存在しない場合は、os.makedirs の呼び出しによって作成されます。 次に、プログラムによってトレーニングとテストのデータが変換され、コンマ区切りファイルが mounted_output_path に出力されます。

パイプラインのステップを指定する

パイプラインを指定するために使用している Python 環境に戻り、次のコードを実行して準備コード用に PythonScriptStep を作成します。

script_folder = "./keras-mnist-fashion"

prep_step = PythonScriptStep(
    name="prepare step",
    script_name="prepare.py",
    # On the compute target, mount fashion_ds dataset as input, prepared_fashion_ds as output
    arguments=[fashion_ds.as_named_input("fashion_ds").as_mount(), prepared_fashion_ds],
    source_directory=script_folder,
    compute_target=compute_target,
    allow_reuse=True,
)

PythonScriptStep を呼び出すと、パイプライン ステップが実行されるときに次のことが指定されます。

  • script_folder ディレクトリのすべてのファイルが compute_target にアップロードされます
  • アップロードされたソース ファイルの中で、ファイル prepare.py が実行されます
  • fashion_ds および prepared_fashion_ds のデータセットが compute_target にマウントされ、ディレクトリとして表示されます
  • fashion_ds ファイルのパスは、prepare.py の最初の引数になります。 prepare.py では、この引数は mounted_input_path に代入されます
  • prepared_fashion_ds のパスは prepare.py の 2 番目の引数になります。 prepare.py では、この引数は mounted_output_path に代入されます
  • allow_reuseTrue であるため、ソース ファイルまたは入力が変更されるまで再実行されません
  • この PythonScriptStep には prepare step という名前が付けられます

パイプラインの主な利点は、モジュール性と再利用性です。 Azure Machine Learning によって、ソース コードまたはデータセットの変更が自動的に決定されます。 allow_reuseTrue の場合、影響を受けていないステップの出力はステップの再実行なしに再利用されます。 あるステップが、変更される可能性のある Azure Machine Learning の外部にあるデータ ソース (たとえば、販売データを含む URL) に依存している場合、allow_reuseFalse に設定すると、パイプライン ステップはパイプラインが実行されるたびに実行されます。

トレーニング スクリプトを作成する

圧縮形式から CSV ファイルに変換されたデータは、畳み込みニューラル ネットワークのトレーニングに使用できます。

トレーニング ステップのソースを作成する

パイプラインが大きいほど、各ステップのソース コードを別のディレクトリ (src/prepare/src/train/ など) に配置することをお勧めします。ただし、このチュートリアルでは、同じ keras-mnist-fashion/ ソース ディレクトリ内のファイル train.py を単に使用するか、作成します。

import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.layers.normalization import BatchNormalization
from keras.utils import to_categorical
from keras.callbacks import Callback

import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from azureml.core import Run

# dataset object from the run
run = Run.get_context()
dataset = run.input_datasets["prepared_fashion_ds"]

# split dataset into train and test set
(train_dataset, test_dataset) = dataset.random_split(percentage=0.8, seed=111)

# load dataset into pandas dataframe
data_train = train_dataset.to_pandas_dataframe()
data_test = test_dataset.to_pandas_dataframe()

img_rows, img_cols = 28, 28
input_shape = (img_rows, img_cols, 1)

X = np.array(data_train.iloc[:, 1:])
y = to_categorical(np.array(data_train.iloc[:, 0]))

# here we split validation data to optimiza classifier during training
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=13)

# test data
X_test = np.array(data_test.iloc[:, 1:])
y_test = to_categorical(np.array(data_test.iloc[:, 0]))


X_train = (
    X_train.reshape(X_train.shape[0], img_rows, img_cols, 1).astype("float32") / 255
)
X_test = X_test.reshape(X_test.shape[0], img_rows, img_cols, 1).astype("float32") / 255
X_val = X_val.reshape(X_val.shape[0], img_rows, img_cols, 1).astype("float32") / 255

batch_size = 256
num_classes = 10
epochs = 10

# construct neuron network
model = Sequential()
model.add(
    Conv2D(
        32,
        kernel_size=(3, 3),
        activation="relu",
        kernel_initializer="he_normal",
        input_shape=input_shape,
    )
)
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.25))
model.add(Conv2D(64, (3, 3), activation="relu"))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Conv2D(128, (3, 3), activation="relu"))
model.add(Dropout(0.4))
model.add(Flatten())
model.add(Dense(128, activation="relu"))
model.add(Dropout(0.3))
model.add(Dense(num_classes, activation="softmax"))

model.compile(
    loss=keras.losses.categorical_crossentropy,
    optimizer=keras.optimizers.Adam(),
    metrics=["accuracy"],
)

# start an Azure ML run
run = Run.get_context()


class LogRunMetrics(Callback):
    # callback at the end of every epoch
    def on_epoch_end(self, epoch, log):
        # log a value repeated which creates a list
        run.log("Loss", log["loss"])
        run.log("Accuracy", log["accuracy"])


history = model.fit(
    X_train,
    y_train,
    batch_size=batch_size,
    epochs=epochs,
    verbose=1,
    validation_data=(X_val, y_val),
    callbacks=[LogRunMetrics()],
)

score = model.evaluate(X_test, y_test, verbose=0)

# log a single value
run.log("Final test loss", score[0])
print("Test loss:", score[0])

run.log("Final test accuracy", score[1])
print("Test accuracy:", score[1])

plt.figure(figsize=(6, 3))
plt.title("Fashion MNIST with Keras ({} epochs)".format(epochs), fontsize=14)
plt.plot(history.history["accuracy"], "b-", label="Accuracy", lw=4, alpha=0.5)
plt.plot(history.history["loss"], "r--", label="Loss", lw=4, alpha=0.5)
plt.legend(fontsize=12)
plt.grid(True)

# log an image
run.log_image("Loss v.s. Accuracy", plot=plt)

# create a ./outputs/model folder in the compute target
# files saved in the "./outputs" folder are automatically uploaded into run history
os.makedirs("./outputs/model", exist_ok=True)

# serialize NN architecture to JSON
model_json = model.to_json()
# save model JSON
with open("./outputs/model/model.json", "w") as f:
    f.write(model_json)
# save model weights
model.save_weights("./outputs/model/model.h5")
print("model saved in ./outputs/model folder")

このコードのほとんどは、ML の開発者にとっては馴染みがあるはずです。

  • データはトレーニング用のトレーニングと検証のセット、そして最終的なスコアリングのための別のテスト サブセットにパーティション分割されます
  • 入力図形は 28x28x1 (1 は入力がグレースケールであるため) であり、バッチに 256 の入力があり、10 のクラスがあります
  • トレーニング エポックの数は 10 になります
  • モデルには、最大プーリングと Dropout を含む 3 つの畳み込みレイヤーと、その後に Dense レイヤーとソフトマックス ヘッドが含まれます
  • モデルが 10 のエポックに適合され、評価されます
  • モデルのアーキテクチャが outputs/model/model.json に書き込まれ、outputs/model/model.h5 に重み付けされます

ただし、コードの一部は Azure Machine Learning に固有のものです。 run = Run.get_context() は、現在のサービス コンテキストを含む Run オブジェクトを取得します。 train.py ソースはこの run オブジェクトを使用して、名前から入力データセットを取得します (スクリプト引数の argv の配列からデータセットを取得した prepare.py のコードの代わり)。

run オブジェクトも、すべてのエポックの終了時にトレーニングの進行状況を記録し、トレーニングの終わりに時間の経過に応じた損失と正確性のグラフを記録するために使用されます。

トレーニングのパイプライン ステップを作成する

トレーニング ステップには、準備ステップよりも少し複雑な構成が含まれます。 準備ステップでは、標準の Python ライブラリのみが使用されていました。 より一般的には、ソース コードを実行するランタイム環境を変更する必要があります。

次の内容を含むファイル conda_dependencies.yml を作成します。

dependencies:
- python=3.7
- pip:
  - azureml-core
  - azureml-dataset-runtime
  - keras==2.4.3
  - tensorflow==2.4.3
  - numpy
  - scikit-learn
  - pandas
  - matplotlib

Environment クラスは、機械学習のタスクが実行されるランタイム環境を表します。 上記の仕様を次を使用してトレーニング コードに関連付けます。

keras_env = Environment.from_conda_specification(
    name="keras-env", file_path="./conda_dependencies.yml"
)

train_cfg = ScriptRunConfig(
    source_directory=script_folder,
    script="train.py",
    compute_target=compute_target,
    environment=keras_env,
)

トレーニング ステップ自体を作成するには、準備ステップの作成に使用したコードと同様のコードを使用します。

train_step = PythonScriptStep(
    name="train step",
    arguments=[
        prepared_fashion_ds.read_delimited_files().as_input(name="prepared_fashion_ds")
    ],
    source_directory=train_cfg.source_directory,
    script_name=train_cfg.script,
    runconfig=train_cfg.run_config,
)

パイプラインを作成して実行する

データの入力と出力を指定し、パイプラインのステップを作成したので、パイプラインにそれらを構成して実行します。

pipeline = Pipeline(workspace, steps=[prep_step, train_step])
run = exp.submit(pipeline)

作成した Pipeline オブジェクトは、workspace で実行され、指定した準備とトレーニングのステップで構成されます。

注意

このパイプラインには単純な依存関係グラフがあります。トレーニング ステップは準備ステップに依存し、準備ステップは fashion_ds データセットに依存します。 実稼働パイプラインには、多くの場合、より複雑な依存関係があります。 ステップが複数の上流ステップに依存している場合があります。初期段階でのソースコードの変更は、はるかに広範囲に及ぶ可能性があります。 Azure Machine Learning が、これらの問題を追跡します。 steps の配列で渡すだけで、Azure Machine Learning が実行グラフの計算を行います。

Experimentsubmit の呼び出しが迅速完了し、次のような出力が生成されます。

Submitted PipelineRun 5968530a-abcd-1234-9cc1-46168951b5eb
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/abc-xyz...

パイプラインの実行を監視するには、リンクを開くか、次を実行して完了するまでブロックします。

run.wait_for_completion(show_output=True)

重要

パイプラインの初回実行には約 "15 分" かかります。 依存関係をすべてダウンロードする必要があるほか、Docker イメージの作成と Python 環境のプロビジョニングおよび作成が行われます。 パイプラインを再度実行する際は、それらのリソースが作成されるのではなく再利用されるので、時間が大幅に短縮されます。 ただし、パイプラインの総実行時間は、実際のスクリプトのワークロードと、各パイプライン ステップで実行される処理によって異なります。

パイプラインが完了すると、トレーニング ステップで記録されたメトリックを取得できます。

run.find_step_run("train step")[0].get_metrics()

メトリックに問題がなければ、ワークスペースにモデルを登録できます。

run.find_step_run("train step")[0].register_model(
    model_name="keras-model",
    model_path="outputs/model/",
    datasets=[("train test data", fashion_ds)],
)

リソースをクリーンアップする

Azure Machine Learning の他のチュートリアルを実行する予定の場合、このセクションは実行しないでください。

コンピューティング インスタンスの停止

コンピューティング インスタンスを使用していた場合は、使用していない VM を停止してコストを削減します。

  1. お使いのワークスペースで、 [コンピューティング] を選択します。

  2. 一覧から、コンピューティング インスタンスの名前を選択します。

  3. [停止] を選択します。

  4. サーバーを再び使用する準備が整ったら、 [開始] を選択します。

すべてを削除する

作成したリソースを今後使用する予定がない場合は、課金が発生しないように削除します。

  1. Azure portal で、左側のメニューにある [リソース グループ] を選択します。
  2. リソース グループの一覧で、自分が作成したリソース グループを選択します。
  3. [リソース グループの削除] を選択します。
  4. リソース グループ名を入力します。 次に、 [削除] を選択します。

リソース グループは保持しつつ、いずれかのワークスペースを削除することもできます。 ワークスペースのプロパティを表示し、 [削除] を選択します。

次のステップ

このチュートリアルでは、次の型を使用しました。

  • Workspace は Azure Machine Learning ワークスペースを表します。 次を含んでいました。
    • パイプラインのトレーニング実行の結果を含む Experiment
    • Fashion-MNIST データストアに保持されているデータを遅延読み込みした Dataset
    • パイプライン ステップが実行されるマシンを表す ComputeTarget
    • パイプライン ステップが実行されるランタイム環境である Environment
    • PythonScriptStep ステップを全体に構成する Pipeline
    • トレーニング プロセスの完了後に登録した Model

Workspace オブジェクトには、このチュートリアルで使用しなかった他のリソース (ノートブック、エンドポイントなど) への参照が含まれています。 詳細については、「Azure Machine Learning ワークスペースとは」を参照してください。

OutputFileDatasetConfig は実行の出力をファイルベースのデータセットに昇格させます。 データセットとデータの操作の詳細については、データへのアクセス方法に関する記事を参照してください。

コンピューティング先と環境の詳細については、「Azure Machine Learning でのコンピューティング ターゲットとは」と「Azure Machine Learning 環境とは?」を参照してください

ScriptRunConfigComputeTargetEnvironment を Python ソース ファイルに関連付けます。 PythonScriptStep はその ScriptRunConfig を受け取り、その入力と出力を定義します。このパイプラインでは、OutputFileDatasetConfig によって作成されたファイル データセットです。

機械学習 SDK を使用してパイプラインを作成するその他の例については、例のリポジトリを参照してください。