Distribuera och köra MLflow-modeller i Spark-jobb

I den här artikeln får du lära dig hur du distribuerar och kör din MLflow-modell i Spark-jobb för att utföra slutsatsdragning över stora mängder data eller som en del av dataomvandlingsjobb.

Om det här exemplet

Det här exemplet visar hur du kan distribuera en MLflow-modell som registrerats i Azure Machine Learning till Spark-jobb som körs i hanterade Spark-kluster (förhandsversion), Azure Databricks eller Azure Synapse Analytics för att utföra slutsatsdragning över stora mängder data.

Modellen är baserad på datauppsättningen för UCI-hjärtsjukdomar. Databasen innehåller 76 attribut, men vi använder en delmängd av 14 av dem. Modellen försöker förutsäga förekomsten av hjärtsjukdomar hos en patient. Det är heltalsvärde från 0 (ingen närvaro) till 1 (närvaro). Den har tränats med en XGBBoost klassificerare och all nödvändig förbearbetning har paketerats som en scikit-learn pipeline, vilket gör den här modellen till en pipeline från slutpunkt till slutpunkt som går från rådata till förutsägelser.

Informationen i den här artikeln baseras på kodexempel som finns i lagringsplatsen azureml-examples . Om du vill köra kommandona lokalt utan att behöva kopiera/klistra in filer klonar du lagringsplatsen och ändrar sedan katalogerna till sdk/using-mlflow/deploy.

git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy

Förutsättningar

Innan du följer stegen i den här artikeln kontrollerar du att du har följande förutsättningar:

  • Installera MLflow SDK-paketet mlflow och Azure Machine Learning-plugin-programmet för MLflow azureml-mlflow.

    pip install mlflow azureml-mlflow
    

    Dricks

    Du kan använda mlflow-skinny paketet, som är ett enkelt MLflow-paket utan SQL-lagring, server, användargränssnitt eller datavetenskapsberoenden. mlflow-skinny rekommenderas för användare som främst behöver MLflows spårnings- och loggningsfunktioner utan att importera hela sviten med funktioner, inklusive distributioner.

  • En Azure Machine Learning-arbetsyta. Du kan skapa en genom att följa självstudien Skapa maskininlärningsresurser.

  • Om du utför fjärrspårning (d.v.s. spårningsexperiment som körs utanför Azure Machine Learning) konfigurerar du MLflow så att det pekar på spårnings-URI:n för din Azure Machine Learning-arbetsyta. Mer information om hur du ansluter MLflow till din arbetsyta finns i Konfigurera MLflow för Azure Machine Learning.

  • Du måste ha en MLflow-modell registrerad på din arbetsyta. I synnerhet registrerar det här exemplet en modell som tränats för datauppsättningen Diabetes.

Anslut till din arbetsyta

Först ska vi ansluta till Azure Machine Learning-arbetsytan där din modell är registrerad.

Spårning har redan konfigurerats åt dig. Dina standardautentiseringsuppgifter används också när du arbetar med MLflow.

Registrera modellen

Vi behöver en modell registrerad i Azure Machine Learning-registret för att utföra slutsatsdragning. I det här fallet har vi redan en lokal kopia av modellen på lagringsplatsen, så vi behöver bara publicera modellen till registret på arbetsytan. Du kan hoppa över det här steget om den modell som du försöker distribuera redan är registrerad.

model_name = 'heart-classifier'
model_local_path = "model"

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version

Om din modell loggades inuti en körning kan du också registrera den direkt.

Dricks

Om du vill registrera modellen måste du veta var modellen har lagrats. Om du använder autolog funktionen för MLflow beror sökvägen på typen och ramverket för den modell som används. Vi rekommenderar att du kontrollerar jobbens utdata för att identifiera vilket som är namnet på den här mappen. Du kan leta efter mappen som innehåller en fil med namnet MLModel. Om du loggar dina modeller manuellt med hjälp av log_modelär sökvägen det argument som du skickar till en sådan metod. Om du till exempel loggar modellen med är mlflow.sklearn.log_model(my_model, "classifier")sökvägen där modellen lagras classifier.

model_name = 'heart-classifier'

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version

Kommentar

Sökvägen MODEL_PATH är den plats där modellen har lagrats i körningen.


Hämta indata för poäng

Vi behöver lite indata att köra eller jobb på. I det här exemplet laddar vi ned exempeldata från Internet och placerar dem i en delad lagring som används av Spark-klustret.

import urllib

urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")

Flytta data till ett monterat lagringskonto som är tillgängligt för hela klustret.

dbutils.fs.mv("file:/tmp/data", "dbfs:/")

Viktigt!

Den tidigare koden använder dbutils, vilket är ett verktyg som är tillgängligt i Azure Databricks-klustret. Använd lämpligt verktyg beroende på vilken plattform du använder.

Indata placeras sedan i följande mapp:

input_data_path = "dbfs:/data"

Kör modellen i Spark-kluster

I följande avsnitt beskrivs hur du kör MLflow-modeller som registrerats i Azure Machine Learning i Spark-jobb.

  1. Kontrollera att följande bibliotek är installerade i klustret:

    - mlflow<3,>=2.1
    - cloudpickle==2.2.0
    - scikit-learn==1.2.0
    - xgboost==1.7.2
    
  2. Vi använder en notebook-fil för att visa hur du skapar en bedömningsrutin med en MLflow-modell registrerad i Azure Machine Learning. Skapa en notebook-fil och använd PySpark som standardspråk.

  3. Importera de nödvändiga namnrymderna:

    import mlflow
    import pyspark.sql.functions as f
    
  4. Konfigurera modell-URI:n. Följande URI ger en modell med namnet heart-classifier i den senaste versionen.

    model_uri = "models:/heart-classifier/latest"
    
  5. Läs in modellen som en UDF-funktion. En användardefinierad funktion (UDF) är en funktion som definieras av en användare, vilket gör att anpassad logik kan återanvändas i användarmiljön.

    predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double') 
    

    Dricks

    Använd argumentet result_type för att styra vilken typ som returneras av predict() funktionen.

  6. Läs de data som du vill poängsätta:

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
    

    I vårt fall är indata i CSV formatet och placeras i mappen dbfs:/data/. Vi släpper också kolumnen target eftersom den här datamängden innehåller målvariabeln som ska förutsägas. I produktionsscenarier har dina data inte den här kolumnen.

  7. Kör funktionen predict_function och placera förutsägelserna på en ny kolumn. I det här fallet placerar vi förutsägelserna i kolumnen predictions.

    df.withColumn("predictions", score_function(*df.columns))
    

    Dricks

    Tar predict_function emot som argument de kolumner som krävs. I vårt fall förväntas alla kolumner i dataramen av modellen och används därför df.columns . Om din modell kräver en delmängd av kolumnerna kan du introducera dem manuellt. Om modellen har en signatur måste typer vara kompatibla mellan indata och förväntade typer.

  8. Du kan skriva tillbaka dina förutsägelser till lagringen:

    scored_data_path = "dbfs:/scored-data"
    scored_data.to_csv(scored_data_path)
    

Köra modellen i ett fristående Spark-jobb i Azure Machine Learning

Azure Machine Learning har stöd för att skapa ett fristående Spark-jobb och skapa en återanvändbar Spark-komponent som kan användas i Azure Machine Learning-pipelines. I det här exemplet distribuerar vi ett bedömningsjobb som körs i ett fristående Spark-jobb i Azure Machine Learning och kör en MLflow-modell för att utföra slutsatsdragning.

Kommentar

Mer information om Spark-jobb i Azure Machine Learning finns i Skicka Spark-jobb i Azure Machine Learning (förhandsversion).

  1. Ett Spark-jobb kräver ett Python-skript som tar argument. Skapa ett bedömningsskript:

    score.py

    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--model")
    parser.add_argument("--input_data")
    parser.add_argument("--scored_data")
    
    args = parser.parse_args()
    print(args.model)
    print(args.input_data)
    
    # Load the model as an UDF function
    predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda")
    
    # Read the data you want to score
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target")
    
    # Run the function `predict_function` and place the predictions on a new column
    scored_data = df.withColumn("predictions", score_function(*df.columns))
    
    # Save the predictions
    scored_data.to_csv(args.scored_data)
    

    Skriptet ovan tar tre argument --model, --input_data och --scored_data. De första två är indata och representerar den modell som vi vill köra och indata, den sista är en utdata och det är utdatamappen där förutsägelser kommer att placeras.

    Dricks

    Installation av Python-paket: Det tidigare bedömningsskriptet läser in MLflow-modellen i en UDF-funktion, men anger parametern env_manager="conda". När den här parametern har angetts återställer MLflow de nödvändiga paketen som anges i modelldefinitionen i en isolerad miljö där endast UDF-funktionen körs. Mer information finns i mlflow.pyfunc.spark_udf dokumentationen.

  2. Skapa en jobbdefinition:

    mlflow-score-spark-job.yml

    $schema: http://azureml/sdk-2-0/SparkJob.json
    type: spark
    
    code: ./src
    entry:
      file: score.py
    
    conf:
      spark.driver.cores: 1
      spark.driver.memory: 2g
      spark.executor.cores: 2
      spark.executor.memory: 2g
      spark.executor.instances: 2
    
    inputs:
      model:
        type: mlflow_model
        path: azureml:heart-classifier@latest
      input_data:
        type: uri_file
        path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv
        mode: direct
    
    outputs:
      scored_data:
        type: uri_folder
    
    args: >-
      --model ${{inputs.model}}
      --input_data ${{inputs.input_data}}
      --scored_data ${{outputs.scored_data}}
    
    identity:
      type: user_identity
    
    resources:
      instance_type: standard_e4s_v3
      runtime_version: "3.2"
    

    Dricks

    Om du vill använda en bifogad Synapse Spark-pool definierar du compute egenskapen i yaml-exempelspecifikationsfilen som visas ovan i stället för resources egenskapen.

  3. YAML-filerna som visas ovan kan användas i az ml job create kommandot med parametern --file för att skapa ett fristående Spark-jobb enligt följande:

    az ml job create -f mlflow-score-spark-job.yml
    

Nästa steg