機械学習パイプラインを発行して追跡する

適用対象:Python SDK azureml v1

この記事では、機械学習パイプラインを同僚や顧客と共有する方法について説明します。

機械学習パイプラインは、機械学習タスクの再利用可能なワークフローです。 パイプラインの利点の 1 つは、コラボレーションの強化です。 パイプラインのバージョン管理を行うこともできます。これにより、新しいバージョンで作業しているときに、現在のモデルを使用することができます。

前提条件

パイプラインを発行する

パイプラインを起動して実行すると、パイプラインを発行することができるため、さまざまな入力で実行されます。 パラメータを受け入れるように既に発行されているパイプラインの REST エンドポイントの場合は、さまざまな引数に PipelineParameter オブジェクトを使用するようにパイプラインを構成する必要があります。

  1. パイプライン パラメーターを作成するには、既定の値で PipelineParameter オブジェクトを使用します。

    from azureml.pipeline.core.graph import PipelineParameter
    
    pipeline_param = PipelineParameter(
      name="pipeline_arg",
      default_value=10)
    
  2. 次のように、パイプラインのいずれかのステップにパラメーターとしてこの PipelineParameter オブジェクトを追加します。

    compareStep = PythonScriptStep(
      script_name="compare.py",
      arguments=["--comp_data1", comp_data1, "--comp_data2", comp_data2, "--output_data", out_data3, "--param1", pipeline_param],
      inputs=[ comp_data1, comp_data2],
      outputs=[out_data3],
      compute_target=compute_target,
      source_directory=project_folder)
    
  3. 呼び出されるときにパラメーターを受け取るこのパイプラインを発行します。

    published_pipeline1 = pipeline_run1.publish_pipeline(
         name="My_Published_Pipeline",
         description="My Published Pipeline Description",
         version="1.0")
    
  4. パイプラインを発行したら、UI で確認できます。 パイプライン ID は、発行されたパイプラインの一意の識別です。

    Screenshot showing published pipeline detail.

発行されたパイプラインを実行する

発行されたすべてのパイプラインに REST エンドポイントがあります。 パイプライン エンドポイントを使用すると、Python 以外のクライアントを含む任意の外部システムからパイプラインの実行をトリガーできます。 このエンドポイントでは、バッチ スコアリングと再トレーニングのシナリオでの "管理された再現性" が有効になります。

重要

Azure ロールベースのアクセス制御 (Azure RBAC) を使用してパイプラインへのアクセスを管理している場合は、パイプライン シナリオ (トレーニングまたはスコアリング) のアクセス許可を設定します。

前のパイプラインの実行を呼び出すには、Microsoft Entra 認証ヘッダー トークンが必要です。 このようなトークンの取得については、AzureCliAuthentication クラスに関するリファレンスとAuthentication in Azure Machine Learning (Azure Machine Learning での認証) に関するノートブックで説明されています。

from azureml.pipeline.core import PublishedPipeline
import requests

response = requests.post(published_pipeline1.endpoint,
                         headers=aad_token,
                         json={"ExperimentName": "My_Pipeline",
                               "ParameterAssignments": {"pipeline_arg": 20}})

POST 要求の json 引数には、ParameterAssignments キーの場合、パイプライン パラメーターとその値を含むディクショナリが含まれている必要があります。 また、json 引数には、次のキーを含めることができます。

Key 説明
ExperimentName このエンドポイントに関連付けられている実験の名前
Description エンドポイントを説明する自由形式のテキスト
Tags 要求のラベル付けと注釈付けに使用できる自由形式のキーと値のペア
DataSetDefinitionValueAssignments 再トレーニングせずにデータセットを変更するために使用されるディクショナリ (以下の説明を参照)
DataPathAssignments 再トレーニングせずにデータパスを変更するために使用されるディクショナリ (以下の説明を参照)

C# を使用して発行されたパイプラインを実行する

次のコードでは、C# からパイプラインを非同期的に呼び出す方法を示します。 部分的なコード スニペットで示されているのは、呼び出しの構造だけであり、Microsoft のサンプルの一部ではありません。 完全なクラスやエラー処理は示されていません。

[DataContract]
public class SubmitPipelineRunRequest
{
    [DataMember]
    public string ExperimentName { get; set; }

    [DataMember]
    public string Description { get; set; }

    [DataMember(IsRequired = false)]
    public IDictionary<string, string> ParameterAssignments { get; set; }
}

// ... in its own class and method ... 
const string RestEndpoint = "your-pipeline-endpoint";

using (HttpClient client = new HttpClient())
{
    var submitPipelineRunRequest = new SubmitPipelineRunRequest()
    {
        ExperimentName = "YourExperimentName", 
        Description = "Asynchronous C# REST api call", 
        ParameterAssignments = new Dictionary<string, string>
        {
            {
                // Replace with your pipeline parameter keys and values
                "your-pipeline-parameter", "default-value"
            }
        }
    };

    string auth_key = "your-auth-key"; 
    client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", auth_key);

    // submit the job
    var requestPayload = JsonConvert.SerializeObject(submitPipelineRunRequest);
    var httpContent = new StringContent(requestPayload, Encoding.UTF8, "application/json");
    var submitResponse = await client.PostAsync(RestEndpoint, httpContent).ConfigureAwait(false);
    if (!submitResponse.IsSuccessStatusCode)
    {
        await WriteFailedResponse(submitResponse); // ... method not shown ...
        return;
    }

    var result = await submitResponse.Content.ReadAsStringAsync().ConfigureAwait(false);
    var obj = JObject.Parse(result);
    // ... use `obj` dictionary to access results
}

Java を使用して発行されたパイプラインを実行する

次のコードでは、認証を必要とするパイプラインの呼び出しを示します (「Azure Machine Learning のリソースとワークフローの認証を設定する」を参照してください)。 パイプラインがパブリックにデプロイされている場合は、authKey を生成する呼び出しは必要ありません。 部分的なコード スニペットには、Java クラスと例外処理の定型コードは示されていません。 コードでは、空の Optional を返す可能性のある関数を連結するために Optional.flatMap が使用されています。 flatMap を使用すると、コードが短縮され、明確になりますが、getRequestBody() により例外が受け入れられることに注意してください。

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Optional;
// JSON library
import com.google.gson.Gson;

String scoringUri = "scoring-endpoint";
String tenantId = "your-tenant-id";
String clientId = "your-client-id";
String clientSecret = "your-client-secret";
String resourceManagerUrl = "https://management.azure.com";
String dataToBeScored = "{ \"ExperimentName\" : \"My_Pipeline\", \"ParameterAssignments\" : { \"pipeline_arg\" : \"20\" }}";

HttpClient client = HttpClient.newBuilder().build();
Gson gson = new Gson();

HttpRequest tokenAuthenticationRequest = tokenAuthenticationRequest(tenantId, clientId, clientSecret, resourceManagerUrl);
Optional<String> authBody = getRequestBody(client, tokenAuthenticationRequest);
Optional<String> authKey = authBody.flatMap(body -> Optional.of(gson.fromJson(body, AuthenticationBody.class).access_token);;
Optional<HttpRequest> scoringRequest = authKey.flatMap(key -> Optional.of(scoringRequest(key, scoringUri, dataToBeScored)));
Optional<String> scoringResult = scoringRequest.flatMap(req -> getRequestBody(client, req));
// ... etc (`scoringResult.orElse()`) ... 

static HttpRequest tokenAuthenticationRequest(String tenantId, String clientId, String clientSecret, String resourceManagerUrl)
{
    String authUrl = String.format("https://login.microsoftonline.com/%s/oauth2/token", tenantId);
    String clientIdParam = String.format("client_id=%s", clientId);
    String resourceParam = String.format("resource=%s", resourceManagerUrl);
    String clientSecretParam = String.format("client_secret=%s", clientSecret);

    String bodyString = String.format("grant_type=client_credentials&%s&%s&%s", clientIdParam, resourceParam, clientSecretParam);

    HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create(authUrl))
        .POST(HttpRequest.BodyPublishers.ofString(bodyString))
        .build();
    return request;
}

static HttpRequest scoringRequest(String authKey, String scoringUri, String dataToBeScored)
{
    HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create(scoringUri))
        .header("Authorization", String.format("Token %s", authKey))
        .POST(HttpRequest.BodyPublishers.ofString(dataToBeScored))
        .build();
    return request;

}

static Optional<String> getRequestBody(HttpClient client, HttpRequest request) {
    try {
        HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
        if (response.statusCode() != 200) {
            System.out.println(String.format("Unexpected server response %d", response.statusCode()));
            return Optional.empty();
        }
        return Optional.of(response.body());
    }catch(Exception x)
    {
        System.out.println(x.toString());
        return Optional.empty();
    }
}

class AuthenticationBody {
    String access_token;
    String token_type;
    int expires_in;
    String scope;
    String refresh_token;
    String id_token;
    
    AuthenticationBody() {}
}

再トレーニングせずにデータセットとデータパスを変更する

さまざまなデータセットとデータパスでトレーニングと推論を行うことができます。 たとえば、トレーニングは小さいデータセットで行い、推論は完全なデータセットで行うことができます。 データセットは、要求の json 引数で DataSetDefinitionValueAssignments キーを使用して切り替えることができます。 DataPathAssignments を使用してデータパスを切り替えます。 どちらの手法も似ています。

  1. パイプライン定義スクリプトで、データセットの PipelineParameter を作成します。 PipelineParameter から DatasetConsumptionConfig または DataPath を作成します。

    tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')
    tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_dataset)
    tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)
    
  2. ML スクリプトで、Run.get_context().input_datasets を使用して、動的に指定されたデータセットにアクセスします。

    from azureml.core import Run
    
    input_tabular_ds = Run.get_context().input_datasets['tabular_dataset']
    dataframe = input_tabular_ds.to_pandas_dataframe()
    # ... etc ...
    

    ML スクリプトが、PipelineParameter (tabular_ds_param) の値ではなく DatasetConsumptionConfig (tabular_dataset) に指定された値にアクセスすることにご注意ください。

  3. パイプライン定義スクリプトで、PipelineScriptStep のパラメーターとして DatasetConsumptionConfig を設定します。

    train_step = PythonScriptStep(
        name="train_step",
        script_name="train_with_dataset.py",
        arguments=["--param1", tabular_ds_consumption],
        inputs=[tabular_ds_consumption],
        compute_target=compute_target,
        source_directory=source_directory)
    
    pipeline = Pipeline(workspace=ws, steps=[train_step])
    
  4. 推論 REST 呼び出しでデータセットを動的に切り替えるには、DataSetDefinitionValueAssignments を使用します。

    tabular_ds1 = Dataset.Tabular.from_delimited_files('path_to_training_dataset')
    tabular_ds2 = Dataset.Tabular.from_delimited_files('path_to_inference_dataset')
    ds1_id = tabular_ds1.id
    d22_id = tabular_ds2.id
    
    response = requests.post(rest_endpoint, 
                             headers=aad_token, 
                             json={
                                "ExperimentName": "MyRestPipeline",
                               "DataSetDefinitionValueAssignments": {
                                    "tabular_ds_param": {
                                        "SavedDataSetReference": {"Id": ds1_id #or ds2_id
                                    }}}})
    

この手法の完全な例は、「データセットと PipelineParameter の紹介」と「データパスと PipelineParameter の紹介」のノートブックにあります。

バージョン管理されたパイプライン エンドポイントを作成する

パイプライン エンドポイントは、背後にある発行された複数のパイプラインを使用して作成できます。 この手法を使用すると、反復処理を行って ML パイプラインを更新するときに、固定の REST エンドポイントを使用できます。

from azureml.pipeline.core import PipelineEndpoint

published_pipeline = PublishedPipeline.get(workspace=ws, id="My_Published_Pipeline_id")
pipeline_endpoint = PipelineEndpoint.publish(workspace=ws, name="PipelineEndpointTest",
                                            pipeline=published_pipeline, description="Test description Notebook")

パイプライン エンドポイントにジョブを送信する

パイプライン エンドポイントの既定のバージョンにジョブを送信できます。

pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name="PipelineEndpointTest")
run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment")
print(run_id)

また、特定のバージョンにジョブを送信することもできます。

run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment", pipeline_version="0")
print(run_id)

同じ操作は、REST API を使用して実行できます。

rest_endpoint = pipeline_endpoint_by_name.endpoint
response = requests.post(rest_endpoint, 
                         headers=aad_token, 
                         json={"ExperimentName": "PipelineEndpointExperiment",
                               "RunSource": "API",
                               "ParameterAssignments": {"1": "united", "2":"city"}})

Studio で発行されたパイプラインを使用する

Studio から発行されたパイプラインを実行することもできます。

  1. Azure Machine Learning Studio にサインインします。

  2. ワークスペースを表示します

  3. 左側で、 [エンドポイント] を選択します。

  4. 上部で、 [Pipeline endpoints](パイプライン エンドポイント) を選択します。 list of machine learning published pipelines

  5. 実行または使用する、またはパイプライン エンドポイントの以前の実行の結果を確認する、特定のパイプラインを選択します。

発行されたパイプラインを無効にする

発行済みパイプラインの一覧にパイプラインが表示されないようにするには、Studio または SDK でそれを無効にします。

# Get the pipeline by using its ID from Azure Machine Learning studio
p = PublishedPipeline.get(ws, id="068f4885-7088-424b-8ce2-eeb9ba5381a6")
p.disable()

p.enable() で再び有効にすることができます。 詳しくは、PublishedPipeline クラスのリファレンスを参照してください。

次のステップ