Share via


MLflow-modellen implementeren en uitvoeren in Spark-taken

In dit artikel leert u hoe u uw MLflow-model implementeert en uitvoert in Spark-taken om deductie uit te voeren over grote hoeveelheden gegevens of als onderdeel van gegevens-wrangling-taken.

Over dit voorbeeld

In dit voorbeeld ziet u hoe u een MLflow-model kunt implementeren dat is geregistreerd in Azure Machine Learning voor Spark-taken die worden uitgevoerd in beheerde Spark-clusters (preview), Azure Databricks of Azure Synapse Analytics, om deductie uit te voeren op grote hoeveelheden gegevens.

Het model is gebaseerd op de UCI Heart Disease Data Set. De database bevat 76 kenmerken, maar we gebruiken een subset van 14 kenmerken. Het model probeert de aanwezigheid van hart- en vaatziekten bij een patiënt te voorspellen. Het is een geheel getal van 0 (geen aanwezigheid) tot 1 (aanwezigheid). Het is getraind met behulp van een XGBBoost classificatie en alle vereiste voorverwerking is verpakt als een scikit-learn pijplijn, waardoor dit model een end-to-end-pijplijn is die van onbewerkte gegevens naar voorspellingen gaat.

De informatie in dit artikel is gebaseerd op codevoorbeelden in de opslagplaats azureml-examples . Als u de opdrachten lokaal wilt uitvoeren zonder bestanden te hoeven kopiëren/plakken, kloont u de opslagplaats en wijzigt u de mappen sdk/using-mlflow/deployin .

git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy

Vereisten

Voordat u de stappen in dit artikel volgt, moet u ervoor zorgen dat u over de volgende vereisten beschikt:

  • Installeer het MLflow SDK-pakket mlflow en de Azure Machine Learning-invoegtoepassing voor MLflow azureml-mlflow.

    pip install mlflow azureml-mlflow
    

    Tip

    U kunt het mlflow-skinny pakket gebruiken. Dit is een lichtgewicht MLflow-pakket zonder SQL-opslag-, server-, UI- of data science-afhankelijkheden. mlflow-skinny wordt aanbevolen voor gebruikers die voornamelijk de mogelijkheden voor tracering en logboekregistratie van MLflow nodig hebben zonder de volledige suite met functies, waaronder implementaties, te importeren.

  • Een Azure Machine Learning-werkruimte. U kunt er een maken door de zelfstudie Machine Learning-resources maken te volgen.

  • Als u externe tracering uitvoert (dat wil gezegd, experimenten bijhouden die buiten Azure Machine Learning worden uitgevoerd), configureert u MLflow zodat deze verwijst naar de tracerings-URI van uw Azure Machine Learning-werkruimte. Zie MLflow configureren voor Azure Machine Learning voor meer informatie over het verbinden van MLflow met uw werkruimte.

  • U moet een MLflow-model hebben geregistreerd in uw werkruimte. In dit voorbeeld wordt met name een model geregistreerd dat is getraind voor de gegevensset Diabetes.

Verbinding maken met uw werkruimte

Eerst gaan we verbinding maken met de Azure Machine Learning-werkruimte waarin uw model is geregistreerd.

Tracering is al voor u geconfigureerd. Uw standaardreferenties worden ook gebruikt bij het werken met MLflow.

Het model registreren

We hebben een model nodig dat is geregistreerd in het Azure Machine Learning-register om deductie uit te voeren. In dit geval hebben we al een lokale kopie van het model in de opslagplaats, dus we hoeven het model alleen in het register in de werkruimte te publiceren. U kunt deze stap overslaan als het model dat u probeert te implementeren al is geregistreerd.

model_name = 'heart-classifier'
model_local_path = "model"

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version

Als uw model is geregistreerd in een uitvoering, kunt u het ook rechtstreeks registreren.

Tip

Als u het model wilt registreren, moet u weten waar het model is opgeslagen. Als u de functie van MLflow gebruikt autolog , is het pad afhankelijk van het type en framework van het gebruikte model. We raden u aan om de uitvoer van taken te controleren om te bepalen welke naam deze map heeft. U kunt zoeken naar de map met een bestand met de naam MLModel. Als u uw modellen handmatig log_modelgebruikt, is het pad het argument dat u aan een dergelijke methode doorgeeft. Als u bijvoorbeeld het model aanmeldt met behulp van mlflow.sklearn.log_model(my_model, "classifier"), dan is classifierhet pad waar het model is opgeslagen.

model_name = 'heart-classifier'

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version

Notitie

Het pad MODEL_PATH is de locatie waar het model is opgeslagen in de uitvoering.


Invoergegevens ophalen om te scoren

We hebben enkele invoergegevens nodig om uit te voeren of taken uit te voeren. In dit voorbeeld downloaden we voorbeeldgegevens van internet en plaatsen we deze in een gedeelde opslag die wordt gebruikt door het Spark-cluster.

import urllib

urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")

Verplaats de gegevens naar een gekoppeld opslagaccount dat beschikbaar is voor het hele cluster.

dbutils.fs.mv("file:/tmp/data", "dbfs:/")

Belangrijk

De vorige code maakt gebruik van dbutilseen hulpprogramma dat beschikbaar is in een Azure Databricks-cluster. Gebruik het juiste hulpprogramma, afhankelijk van het platform dat u gebruikt.

De invoergegevens worden vervolgens in de volgende map geplaatst:

input_data_path = "dbfs:/data"

Het model uitvoeren in Spark-clusters

In de volgende sectie wordt uitgelegd hoe u MLflow-modellen uitvoert die zijn geregistreerd in Azure Machine Learning in Spark-taken.

  1. Zorg ervoor dat de volgende bibliotheken zijn geïnstalleerd in het cluster:

    - mlflow<3,>=2.1
    - cloudpickle==2.2.0
    - scikit-learn==1.2.0
    - xgboost==1.7.2
    
  2. We gebruiken een notebook om te laten zien hoe u een scoreroutine maakt met een MLflow-model dat is geregistreerd in Azure Machine Learning. Maak een notebook en gebruik PySpark als standaardtaal.

  3. Importeer de vereiste naamruimten:

    import mlflow
    import pyspark.sql.functions as f
    
  4. Configureer de model-URI. De volgende URI brengt een model met de naam heart-classifier in de nieuwste versie.

    model_uri = "models:/heart-classifier/latest"
    
  5. Laad het model als een UDF-functie. Een door de gebruiker gedefinieerde functie (UDF) is een functie die door een gebruiker is gedefinieerd, zodat aangepaste logica opnieuw kan worden gebruikt in de gebruikersomgeving.

    predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double') 
    

    Tip

    Gebruik het argument result_type om het type te bepalen dat door de predict() functie wordt geretourneerd.

  6. Lees de gegevens die u wilt scoren:

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
    

    In ons geval hebben de invoergegevens de CSV indeling en worden ze in de map dbfs:/data/geplaatst. We verwijderen ook de kolom target omdat deze gegevensset de doelvariabele bevat die moet worden voorspeld. In productiescenario's hebben uw gegevens deze kolom niet.

  7. Voer de functie predict_function uit en plaats de voorspellingen op een nieuwe kolom. In dit geval plaatsen we de voorspellingen in de kolom predictions.

    df.withColumn("predictions", score_function(*df.columns))
    

    Tip

    De predict_function kolommen worden als argumenten ontvangen. In ons geval worden alle kolommen van het gegevensframe verwacht door het model en daarom df.columns gebruikt. Als voor uw model een subset van de kolommen is vereist, kunt u ze handmatig introduceren. Als u een handtekening hebt, moeten typen compatibel zijn tussen invoer en verwachte typen.

  8. U kunt uw voorspellingen terugschrijven naar de opslag:

    scored_data_path = "dbfs:/scored-data"
    scored_data.to_csv(scored_data_path)
    

Het model uitvoeren in een zelfstandige Spark-taak in Azure Machine Learning

Azure Machine Learning biedt ondersteuning voor het maken van een zelfstandige Spark-taak en het maken van een herbruikbaar Spark-onderdeel dat kan worden gebruikt in Azure Machine Learning-pijplijnen. In dit voorbeeld implementeren we een scoretaak die wordt uitgevoerd in een zelfstandige Spark-taak van Azure Machine Learning en wordt een MLflow-model uitgevoerd om deductie uit te voeren.

Notitie

Zie Spark-taken verzenden in Azure Machine Learning (preview) voor meer informatie over Spark-taken in Azure Machine Learning.

  1. Voor een Spark-taak is een Python-script vereist dat argumenten gebruikt. Een scorescript maken:

    score.py

    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--model")
    parser.add_argument("--input_data")
    parser.add_argument("--scored_data")
    
    args = parser.parse_args()
    print(args.model)
    print(args.input_data)
    
    # Load the model as an UDF function
    predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda")
    
    # Read the data you want to score
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target")
    
    # Run the function `predict_function` and place the predictions on a new column
    scored_data = df.withColumn("predictions", score_function(*df.columns))
    
    # Save the predictions
    scored_data.to_csv(args.scored_data)
    

    Het bovenstaande script heeft drie argumenten --model, --input_data en --scored_data. De eerste twee zijn invoer en vertegenwoordigen het model dat we willen uitvoeren en de invoergegevens, de laatste is een uitvoer en het is de uitvoermap waarin voorspellingen worden geplaatst.

    Tip

    Installatie van Python-pakketten: het vorige scorescript laadt het MLflow-model in een UDF-functie, maar geeft de parameter env_manager="conda"aan. Wanneer deze parameter is ingesteld, worden de vereiste pakketten hersteld zoals opgegeven in de modeldefinitie in een geïsoleerde omgeving waar alleen de UDF-functie wordt uitgevoerd. Zie de documentatie voor meer informatie mlflow.pyfunc.spark_udf .

  2. Een taakdefinitie maken:

    mlflow-score-spark-job.yml

    $schema: http://azureml/sdk-2-0/SparkJob.json
    type: spark
    
    code: ./src
    entry:
      file: score.py
    
    conf:
      spark.driver.cores: 1
      spark.driver.memory: 2g
      spark.executor.cores: 2
      spark.executor.memory: 2g
      spark.executor.instances: 2
    
    inputs:
      model:
        type: mlflow_model
        path: azureml:heart-classifier@latest
      input_data:
        type: uri_file
        path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv
        mode: direct
    
    outputs:
      scored_data:
        type: uri_folder
    
    args: >-
      --model ${{inputs.model}}
      --input_data ${{inputs.input_data}}
      --scored_data ${{outputs.scored_data}}
    
    identity:
      type: user_identity
    
    resources:
      instance_type: standard_e4s_v3
      runtime_version: "3.2"
    

    Tip

    Als u een gekoppelde Synapse Spark-pool wilt gebruiken, definieert u compute de eigenschap in het voorbeeldbestand van de YAML-specificatie dat hierboven wordt weergegeven in plaats van resources de eigenschap.

  3. De YAML-bestanden die hierboven worden weergegeven, kunnen worden gebruikt in de az ml job create opdracht, met de --file parameter, om een zelfstandige Spark-taak te maken, zoals wordt weergegeven:

    az ml job create -f mlflow-score-spark-job.yml
    

Volgende stappen