パイプラインを SDK v2 にアップグレードする

SDK v2 では、"パイプライン" がジョブに統合されます。

ジョブには種類があります。 ほとんどのジョブは、command (python main.py など) を実行するコマンド ジョブです。 ジョブで実行されるものはどのプログラミング言語にも依存しないため、bash スクリプトの実行、python インタープリターの呼び出し、一連の curl コマンドの実行などが行えます。

pipeline は、もう 1 つの種類のジョブです。これは、入出力リレーションシップを持つ可能性のある子ジョブを定義し、有向非巡回グラフ (DAG) を形成します。

アップグレードするには、パイプラインを定義して SDK v2 に送信するためのコードを変更する必要があります。 子ジョブ "内で" 実行するものを SDK v2 にアップグレードする必要はありません。 ただし、Azure Machine Learning に固有のコードは、モデル トレーニング スクリプトから削除することをお勧めします。 この分離により、ローカルとクラウドの切り替えが容易になり、成熟した MLOps のベスト プラクティスと見なされます。 実際には、これは azureml.* コード行の削除を意味します。 モデルのログと追跡コードは、MLflow に置き換える必要があります。 詳細については、v2 での MLflow の使用方法に関する記事をご覧ください。

この記事では、SDK v1 と SDK v2 のシナリオの比較を示します。 次の例では、ダミーのパイプライン ジョブ内に 3 つのステップ (トレーニング、スコア付け、評価) を作成します。 この例では、SDK v1 と SDK v2 を使用してパイプライン ジョブを構築する方法と、データを使用してステップ間でデータを転送する方法を示しています。

パイプラインの実行

  • SDK v1

    # import required libraries
    import os
    import azureml.core
    from azureml.core import (
        Workspace,
        Dataset,
        Datastore,
        ComputeTarget,
        Experiment,
        ScriptRunConfig,
    )
    from azureml.pipeline.steps import PythonScriptStep
    from azureml.pipeline.core import Pipeline
    
    # check core SDK version number
    print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)
    
    # load workspace
    workspace = Workspace.from_config()
    print(
        "Workspace name: " + workspace.name,
        "Azure region: " + workspace.location,
        "Subscription id: " + workspace.subscription_id,
        "Resource group: " + workspace.resource_group,
        sep="\n",
    )
    
    # create an ML experiment
    experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline")
    
    # create a directory
    script_folder = "./src"
    
    # create compute
    from azureml.core.compute import ComputeTarget, AmlCompute
    from azureml.core.compute_target import ComputeTargetException
    
    # Choose a name for your CPU cluster
    amlcompute_cluster_name = "cpu-cluster"
    
    # Verify that cluster does not exist already
    try:
        aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name)
        print('Found existing cluster, use it.')
    except ComputeTargetException:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2',
                                                               max_nodes=4)
        aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)
    
    aml_compute.wait_for_completion(show_output=True)
    
    # define data set
    data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"]
    input_ds = Dataset.File.from_files(data_urls)
    
    # define steps in pipeline
    from azureml.data import OutputFileDatasetConfig
    model_output = OutputFileDatasetConfig('model_output')
    train_step = PythonScriptStep(
        name="train step",
        script_name="train.py",
        arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    score_output = OutputFileDatasetConfig('score_output')
    score_step = PythonScriptStep(
        name="score step",
        script_name="score.py",
        arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    eval_output = OutputFileDatasetConfig('eval_output')
    eval_step = PythonScriptStep(
        name="eval step",
        script_name="eval.py",
        arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    # built pipeline
    from azureml.pipeline.core import Pipeline
    
    pipeline_steps = [train_step, score_step, eval_step]
    
    pipeline = Pipeline(workspace = workspace, steps=pipeline_steps)
    print("Pipeline is built.")
    
    pipeline_run = experiment.submit(pipeline, regenerate_outputs=False)
    
    print("Pipeline submitted for execution.")
    
    
  • SDK v2。 完全なサンプルへのリンク

    # import required libraries
    from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
    
    from azure.ai.ml import MLClient, Input
    from azure.ai.ml.dsl import pipeline
    
    try:
        credential = DefaultAzureCredential()
        # Check if given credential can get token successfully.
        credential.get_token("https://management.azure.com/.default")
    except Exception as ex:
        # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
        credential = InteractiveBrowserCredential()
    
    # Get a handle to workspace
    ml_client = MLClient.from_config(credential=credential)
    
    # Retrieve an already attached Azure Machine Learning Compute.
    cluster_name = "cpu-cluster"
    print(ml_client.compute.get(cluster_name))
    
    # Import components that are defined with Python function
    with open("src/components.py") as fin:
        print(fin.read())
    
    # You need to install mldesigner package to use command_component decorator.
    # Option 1: install directly
    # !pip install mldesigner
    
    # Option 2: install as an extra dependency of azure-ai-ml
    # !pip install azure-ai-ml[designer]
    
    # import the components as functions
    from src.components import train_model, score_data, eval_model
    
    cluster_name = "cpu-cluster"
    # define a pipeline with component
    @pipeline(default_compute=cluster_name)
    def pipeline_with_python_function_components(input_data, test_data, learning_rate):
        """E2E dummy train-score-eval pipeline with components defined via Python function components"""
    
        # Call component obj as function: apply given inputs & parameters to create a node in pipeline
        train_with_sample_data = train_model(
            training_data=input_data, max_epochs=5, learning_rate=learning_rate
        )
    
        score_with_sample_data = score_data(
            model_input=train_with_sample_data.outputs.model_output, test_data=test_data
        )
    
        eval_with_sample_data = eval_model(
            scoring_result=score_with_sample_data.outputs.score_output
        )
    
        # Return: pipeline outputs
        return {
            "eval_output": eval_with_sample_data.outputs.eval_output,
            "model_output": train_with_sample_data.outputs.model_output,
        }
    
    
    pipeline_job = pipeline_with_python_function_components(
        input_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        test_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        learning_rate=0.1,
    )
    
    # submit job to workspace
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="train_score_eval_pipeline"
    )
    

SDK v1 と SDK v2 の主要機能のマッピング

SDK v1 の機能 SDK v2 での大まかなマッピング
azureml.pipeline.core.Pipeline azure.ai.ml.dsl.pipeline
OutputDatasetConfig 出力
dataset as_mount 入力

ステップとジョブ/コンポーネントの種類のマッピング

SDK v1 のステップ SDK v2 のジョブの種類 SDK v2 のコンポーネントの種類
adla_step なし なし
automl_step automl ジョブ automl コンポーネント
azurebatch_step なし なし
command_step command ジョブ command コンポーネント
data_transfer_step なし None
databricks_step None なし
estimator_step command ジョブ command コンポーネント
hyper_drive_step sweep ジョブ なし
kusto_step None None
module_step なし command コンポーネント
mpi_step command ジョブ command コンポーネント
parallel_run_step Parallel ジョブ Parallel コンポーネント
python_script_step command ジョブ command コンポーネント
r_script_step command ジョブ command コンポーネント
synapse_spark_step spark ジョブ spark コンポーネント

公開済みパイプライン

パイプラインを起動して実行すると、パイプラインを発行することができるため、さまざまな入力で実行されます。 これは公開されたパイプラインと呼ばれていました。 バッチ エンドポイントでは、持続的な API で実行されている複数のアセットを処理するための同様のより強力な方法が提案されています。そのため、公開されたパイプラインの機能は、バッチ エンドポイントのパイプライン コンポーネント デプロイに移動されました。

バッチ エンドポイントは、インターフェイス (エンドポイント) を実際の実装 (デプロイ) から切り離し、エンドポイントの既定の実装を提供するデプロイをユーザーが決定できるようにします。 バッチ エンドポイントでパイプライン コンポーネントのデプロイを使用すると、ユーザーはパイプラインではなくパイプライン コンポーネントをデプロイできるようになります。これにより、MLOps プラクティスを効率化しようとしている組織向けに、再利用可能なアセットをより有効に使用できます。

次の表は、各概念の比較を示しています。

概念 SDK v1 SDK v2
呼び出し用のパイプラインの REST エンドポイント パイプライン エンドポイント バッチ エンドポイント
エンドポイント下のパイプラインの特定のバージョン 公開されたパイプライン パイプライン コンポーネント デプロイ
呼び出し時のパイプラインの引数 パイプライン パラメーター ジョブの入力
公開されたパイプラインから生成されるジョブ パイプライン ジョブ バッチ ジョブ

バッチ エンドポイントへの移行方法に関する具体的なガイダンスについては、「パイプライン エンドポイントを SDK v2 にアップグレードする」を参照してください。

詳しくは、こちらのドキュメントをご覧ください。