Spark ジョブでの MLflow モデルのデプロイと実行

この記事では、Spark ジョブで MLflow モデルをデプロイして実行し、大量のデータに対して、またはデータ ラングリング ジョブの一部として推論を実行する方法について説明します。

この例の概要

この例では、Azure Machine Learning に登録されている MLflow モデルを、マネージド Spark クラスター (プレビュー)、Azure Databricks、または Azure Synapse Analytics で実行されている Spark ジョブにデプロイして、大量のデータに対して推論を実行する方法を示します。

このモデルは、UCI Heart Disease Data Set をベースにしています。 このデータベースには 76 個の属性が含まれていますが、そのサブセットである 14 個を使っています。 このモデルは、患者の心臓病の存在を予測しようと試みるものです。 これは 0 (存在しない) から 1 (存在する) の整数値です。 トレーニングには XGBBoost 分類器が使われ、必要な前処理はすべて scikit-learn パイプラインとしてパッケージ化されているため、このモデルは生データから予測までを行うエンドツーエンドのパイプラインになっています。

この記事の情報は、azureml-examples リポジトリに含まれているコード サンプルを基にしています。 ファイルをコピーして貼り付けることなくコマンドをローカルで実行するには、リポジトリを複製し、ディレクトリを sdk/using-mlflow/deploy に変更します。

git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy

前提条件

この記事の手順に従う前に、次の前提条件が満たされていることをご確認ください。

  • MLflow SDK パッケージ mlflow と MLflow 用の Azure Machine Learning プラグイン azureml-mlflow をインストールします。

    pip install mlflow azureml-mlflow
    

    ヒント

    SQL ストレージ、サーバー、UI、またはデータ サイエンスの依存関係のない軽量 MLflow パッケージであるパッケージ mlflow-skinny を使用できます。 主に MLflow の追跡機能とログ機能を必要とし、デプロイを含む全機能はインポートしないユーザーには、mlflow-skinny が推奨されます。

  • Azure Machine Learning ワークスペース。 機械学習リソースの作成に関するチュートリアルに従って作成できます。

  • リモート追跡 (Azure Machine Learning の外部で実行されている実験の追跡) を実行している場合は、Azure Machine Learning ワークスペースの追跡 URI を指すように MLflow を構成します。 MLflow をワークスペースに接続する方法の詳細については、「Azure Machine Learning 用に MLflow を構成する」を参照してください。

  • ワークスペースに MLflow モデルが登録されている必要があります。 特に、この例では、Diabetes データセット用にトレーニングされたモデルを登録します。

ワークスペースに接続する

まず、モデルが登録されている Azure Machine Learning ワークスペースに接続しましょう。

追跡は既に構成されています。 MLflow を使用する場合は、既定の資格情報も使用されます。

モデルを登録する

推論を実行するには、Azure Machine Learning レジストリに登録されたモデルが必要です。 この場合、リポジトリにモデルのローカル コピーが既にあるので、ワークスペースのレジストリにモデルを発行するだけで済みます。 デプロイ対象のモデルが既に登録されている場合は、この手順をスキップできます。

model_name = 'heart-classifier'
model_local_path = "model"

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version

または、モデルが実行内でログに記録された場合は、モデルを直接登録できます。

ヒント

モデルを登録するには、モデルが格納されている場所を把握する必要があります。 MLflow の autolog 機能を使用する場合は、使用されるモデルの種類とフレームワークによってパスが異なります。 ジョブの出力を調べて、このフォルダーの名前を特定することをお勧めします。 MLModel という名前のファイルが含まれているフォルダーを探すことができます。 log_model を使用して手動でモデルを記録する場合は、対象のメソッドにこのパスを引数として渡します。 たとえば、mlflow.sklearn.log_model(my_model, "classifier") を使用してモデルを記録する場合、モデルが格納されるパスは classifier です。

model_name = 'heart-classifier'

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version

注意

パス MODEL_PATH は、モデルが実行で格納された場所です。


スコア付けする入力データを取得する

ジョブで実行する入力データが必要です。 この例では、インターネットからサンプル データをダウンロードし、Spark クラスターで使用される共有ストレージに配置します。

import urllib

urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")

クラスター全体で使用できるマウントされたストレージ アカウントにデータを移動します。

dbutils.fs.mv("file:/tmp/data", "dbfs:/")

重要

前のコードでは dbutils を使用しています。これは Azure Databricks クラスターで使用できるツールです。 使用しているプラットフォームに応じて、適切なツールを使用します。

その後、入力データは次のフォルダーに配置されます。

input_data_path = "dbfs:/data"

Spark クラスターでモデルを実行する

次のセクションでは、Spark ジョブで Azure Machine Learning に登録されている MLflow モデルを実行する方法について説明します。

  1. クラスターに次のライブラリがインストールされていることを確認します。

    - mlflow<3,>=2.1
    - cloudpickle==2.2.0
    - scikit-learn==1.2.0
    - xgboost==1.7.2
    
  2. ノートブックを使用して、Azure Machine Learning に登録された MLflow モデルを使用してスコアリング ルーチンを作成する方法を示します。 ノートブックを作成し、既定の言語として PySpark を使用します。

  3. 必要な名前空間をインポートします。

    import mlflow
    import pyspark.sql.functions as f
    
  4. モデル URI を構成します。 次の URI では、heart-classifier という名前のモデルの最新バージョンが取り込まれます。

    model_uri = "models:/heart-classifier/latest"
    
  5. モデルを UDF 関数として読み込みます。 ユーザー定義関数 (UDF) はユーザーによって定義された関数であり、ユーザー環境でカスタム ロジックを再利用できます。

    predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double') 
    

    ヒント

    predict() 関数によって返される型を制御するには、 引数 result_type を使用します。

  6. スコア付けするデータを読み取ります。

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
    

    この場合、入力データは CSV 形式で、dbfs:/data/ フォルダーに配置されます。 また、このデータセットには予測するターゲット変数が含まれているため、列 target を削除します。 運用環境のシナリオでは、データにこの列はありません。

  7. 関数 predict_function を実行し、新しい列に予測を配置します。 この場合は、予測を predictions 列に配置します。

    df.withColumn("predictions", score_function(*df.columns))
    

    ヒント

    predict_function では、必要な列を引数として受け取ります。 この例では、データ フレームのすべての列がモデルによって予期されているため、df.columns が使用されます。 モデルで列のサブセットが必要な場合は、それらを手動で導入できます。 モデルにシグネチャがある場合、入力と、予期される型の間で型の互換性がある必要があります。

  8. 予測をストレージに書き戻すことができます。

    scored_data_path = "dbfs:/scored-data"
    scored_data.to_csv(scored_data_path)
    

Azure Machine Learning のスタンドアロン Spark ジョブでモデルを実行する

Azure Machine Learning では、スタンドアロン Spark ジョブの作成と、Azure Machine Learning パイプラインで使用できる再利用可能な Spark コンポーネントの作成がサポートされています。 この例では、Azure Machine Learning のスタンドアロン Spark ジョブで実行され、MLflow モデルを実行して推論を実行するスコアリング ジョブをデプロイします。

Note

Azure Machine Learning の Spark ジョブの詳細については、「Azure Machine Learning で Spark ジョブを送信する (プレビュー)」を参照してください。

  1. Spark ジョブには、引数を受け取る Python スクリプトが必要です。 次のようにスコアリング スクリプトを作成します。

    score.py

    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--model")
    parser.add_argument("--input_data")
    parser.add_argument("--scored_data")
    
    args = parser.parse_args()
    print(args.model)
    print(args.input_data)
    
    # Load the model as an UDF function
    predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda")
    
    # Read the data you want to score
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target")
    
    # Run the function `predict_function` and place the predictions on a new column
    scored_data = df.withColumn("predictions", score_function(*df.columns))
    
    # Save the predictions
    scored_data.to_csv(args.scored_data)
    

    上記のスクリプトでは、--model--input_data--scored_data の 3 つの引数を受け取ります。 最初の 2 つは入力であり、実行するモデルと入力データを表します。最後の 1 つは出力であり、予測が配置される出力フォルダーです。

    ヒント

    Python パッケージのインストール: 前のスコアリング スクリプトでは MLflow モデルを UDF 関数に読み込みますが、 パラメーター env_manager="conda"を示しています。 このパラメーターを設定すると、UDF 関数のみが実行される分離環境で、モデル定義で指定されている必要なパッケージが MLflow によって復元されます。 詳細については、mlflow.pyfunc.spark_udf のドキュメントを参照してください。

  2. ジョブ定義を作成します。

    mlflow-score-spark-job.yml

    $schema: http://azureml/sdk-2-0/SparkJob.json
    type: spark
    
    code: ./src
    entry:
      file: score.py
    
    conf:
      spark.driver.cores: 1
      spark.driver.memory: 2g
      spark.executor.cores: 2
      spark.executor.memory: 2g
      spark.executor.instances: 2
    
    inputs:
      model:
        type: mlflow_model
        path: azureml:heart-classifier@latest
      input_data:
        type: uri_file
        path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv
        mode: direct
    
    outputs:
      scored_data:
        type: uri_folder
    
    args: >-
      --model ${{inputs.model}}
      --input_data ${{inputs.input_data}}
      --scored_data ${{outputs.scored_data}}
    
    identity:
      type: user_identity
    
    resources:
      instance_type: standard_e4s_v3
      runtime_version: "3.2"
    

    ヒント

    アタッチされた Synapse Spark プールを使うには、前述のサンプル YAML 仕様ファイルで resources プロパティではなく compute プロパティを定義します。

  3. 上の YAML ファイルを az ml job create コマンドの --file パラメーターで指定して、次のようにスタンドアロン Spark ジョブを作成できます。

    az ml job create -f mlflow-score-spark-job.yml
    

次のステップ