Поделиться через


Развертывание и запуск моделей MLflow в заданиях Spark

Из этой статьи вы узнаете, как развернуть и запустить модель MLflow в заданиях Spark, чтобы выполнить вывод по большим объемам данных или как часть заданий обработки данных.

Сведения об этом примере

В этом примере показано, как развернуть модель MLflow, зарегистрированную в Машинное обучение Azure задания Spark, выполняемые в управляемых кластерах Spark (предварительная версия), Azure Databricks или Azure Synapse Analytics, для вывода больших объемов данных.

Модель основана на наборе данных болезни сердца UCI. База данных содержит 76 атрибутов, но мы используем подмножество из 14 из них. Модель пытается предсказать наличие сердечно-сосудистых заболеваний у пациента. Целочисленное значение от 0 (нет присутствия) до 1 (присутствие). Он был обучен с помощью XGBBoost классификатора, и все необходимые предварительной обработки были упакованы в виде scikit-learn конвейера, что делает эту модель сквозным конвейером, который переходит от необработанных данных к прогнозам.

Сведения в этой статье основаны на примерах кода, имеющихся в репозитории azureml-examples. Для локального выполнения команд без необходимости копирования и вставки файлов клонируйте репозиторий, а затем измените каталоги sdk/using-mlflow/deployна .

git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy

Необходимые компоненты

Перед выполнением действий, описанных в этой статье, убедитесь, что выполнены следующие необходимые условия:

  • Установите пакет mlflow SDK MLflow и подключаемый модуль Машинное обучение Azure azureml-mlflow для MLflow:

    pip install mlflow azureml-mlflow
    

    Совет

    Вы можете использовать mlflow-skinny пакет, который является упрощенным пакетом MLflow без хранилища SQL, сервера, пользовательского интерфейса или зависимостей для обработки и анализа данных. mlflow-skinny рекомендуется для пользователей, которым в первую очередь нужны возможности отслеживания И ведения журнала MLflow, не импортируя полный набор функций, включая развертывания.

  • Рабочая область Машинного обучения Azure. Сведения о создании рабочей области см. в руководстве по созданию ресурсов машинного обучения. Просмотрите разрешения доступа, необходимые для выполнения операций MLflow в рабочей области.

  • Если вы выполняете удаленное отслеживание (то есть отслеживайте эксперименты, выполняемые вне Машинное обучение Azure), настройте MLflow, чтобы указать URI отслеживания рабочей области Машинное обучение Azure. Дополнительные сведения о подключении MLflow к рабочей области см. в разделе "Настройка MLflow" для Машинное обучение Azure.

  • В рабочей области должна быть зарегистрирована модель MLflow. В частности, в этом примере будет зарегистрирована модель, обученная для набора данных диабета.

Подключение к рабочей области

Сначала давайте подключимся к рабочей области Машинное обучение Azure, где зарегистрирована модель.

Отслеживание уже настроено для вас. Учетные данные по умолчанию также будут использоваться при работе с MLflow.

Регистрация модели

Нам нужна модель, зарегистрированная в реестре Машинное обучение Azure для выполнения вывода. В этом случае у нас уже есть локальная копия модели в репозитории, поэтому нам нужно опубликовать модель в реестре в рабочей области. Этот шаг можно пропустить, если модель, на который вы пытаетесь развернуть, уже зарегистрирована.

model_name = 'heart-classifier'
model_local_path = "model"

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version

Кроме того, если модель была зарегистрирована внутри запуска, ее можно зарегистрировать напрямую.

Совет

Чтобы зарегистрировать модель, необходимо знать расположение, в котором хранится модель. Если вы используете функцию autolog MLflow, путь будет зависеть от типа и платформы используемой модели. Мы рекомендуем проверить выходные данные заданий, чтобы определить имя этой папки. Вы можете найти папку, содержащую файл с именем MLModel. Если вы регистрируете модели вручную с использованием log_model, то путем будет аргумент, который вы передаете такому методу. Например, если вы регистрируете модель с помощью mlflow.sklearn.log_model(my_model, "classifier"), то путь, в котором хранится classifierмодель.

model_name = 'heart-classifier'

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version

Примечание.

Путь MODEL_PATH — это расположение, где модель хранилась при выполнении.


Получение входных данных для оценки

Для выполнения или заданий нам потребуется некоторые входные данные. В этом примере мы скачайте примеры данных из Интернета и поместим его в общее хранилище, используемое кластером Spark.

import urllib

urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")

Переместите данные в подключенную учетную запись хранения, доступную всему кластеру.

dbutils.fs.mv("file:/tmp/data", "dbfs:/")

Внимание

Предыдущий код использует dbutilsинструмент, доступный в кластере Azure Databricks. Используйте соответствующее средство в зависимости от используемой платформы.

Затем входные данные помещаются в следующую папку:

input_data_path = "dbfs:/data"

Запуск модели в кластерах Spark

В следующем разделе объясняется, как запускать модели MLflow, зарегистрированные в Машинное обучение Azure в заданиях Spark.

  1. Убедитесь, что в кластере установлены следующие библиотеки:

    - mlflow<3,>=2.1
    - cloudpickle==2.2.0
    - scikit-learn==1.2.0
    - xgboost==1.7.2
    
  2. Мы будем использовать записную книжку для демонстрации того, как создать подпрограмму оценки с моделью MLflow, зарегистрированной в Машинное обучение Azure. Создайте записную книжку и используйте PySpark в качестве языка по умолчанию.

  3. Импортируйте необходимые пространства имен:

    import mlflow
    import pyspark.sql.functions as f
    
  4. Настройте универсальный код ресурса (URI) модели. Следующий универсальный код ресурса (URI) содержит модель с именем heart-classifier в последней версии.

    model_uri = "models:/heart-classifier/latest"
    
  5. Загрузите модель в качестве функции UDF. Определяемая пользователем функция (UDF) — это функция, определяемая пользователем, что позволяет повторно использовать пользовательскую логику в пользовательской среде.

    predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double') 
    

    Совет

    Используйте аргумент result_type для управления типом, возвращаемым функцией predict() .

  6. Считывайте данные, которые вы хотите оценить:

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
    

    В нашем случае входные данные помещаются в CSV формат и помещаются в папку dbfs:/data/. Мы также сбрасываем столбец target , так как этот набор данных содержит целевую переменную для прогнозирования. В рабочих сценариях данные не будут иметь этот столбец.

  7. Запустите функцию predict_function и поместите прогнозы в новый столбец. В этом случае мы помещаем прогнозы в столбец predictions.

    df.withColumn("predictions", score_function(*df.columns))
    

    Совет

    Получает predict_function в качестве аргументов необходимые столбцы. В нашем случае все столбцы кадра данных ожидаются моделью и поэтому df.columns используются. Если для модели требуется подмножество столбцов, их можно представить вручную. Если у модели есть сигнатура, типы должны быть совместимы между входными и ожидаемыми типами.

  8. Прогнозы можно записать обратно в хранилище:

    scored_data_path = "dbfs:/scored-data"
    scored_data.to_csv(scored_data_path)
    

Запуск модели в автономном задании Spark в Машинное обучение Azure

Машинное обучение Azure поддерживает создание изолированного задания Spark и создание повторно используемого компонента Spark, который можно использовать в конвейерах Машинное обучение Azure. В этом примере мы развернем задание оценки, которое выполняется в автономном задании Spark Машинное обучение Azure и запускает модель MLflow для выполнения вывода.

Примечание.

Дополнительные сведения о заданиях Spark в Машинное обучение Azure см. в разделе "Отправка заданий Spark" в Машинное обучение Azure (предварительная версия).

  1. Для задания Spark требуется скрипт Python, который принимает аргументы. Создайте скрипт оценки:

    score.py

    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--model")
    parser.add_argument("--input_data")
    parser.add_argument("--scored_data")
    
    args = parser.parse_args()
    print(args.model)
    print(args.input_data)
    
    # Load the model as an UDF function
    predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda")
    
    # Read the data you want to score
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target")
    
    # Run the function `predict_function` and place the predictions on a new column
    scored_data = df.withColumn("predictions", score_function(*df.columns))
    
    # Save the predictions
    scored_data.to_csv(args.scored_data)
    

    Приведенный выше скрипт принимает три аргумента --modelи --scored_data--input_data . Первые два являются входными данными и представляют модель, которую мы хотим запустить, а входные данные — выходные данные, и это папка вывода, в которой будут размещаться прогнозы.

    Совет

    Установка пакетов Python: предыдущий скрипт оценки загружает модель MLflow в функцию UDF, но указывает параметр env_manager="conda". Если этот параметр задан, MLflow восстановит необходимые пакеты, указанные в определении модели в изолированной среде, где выполняется только функция UDF. Дополнительные сведения см mlflow.pyfunc.spark_udf . в документации.

  2. Создайте определение задания:

    mlflow-score-spark-job.yml

    $schema: http://azureml/sdk-2-0/SparkJob.json
    type: spark
    
    code: ./src
    entry:
      file: score.py
    
    conf:
      spark.driver.cores: 1
      spark.driver.memory: 2g
      spark.executor.cores: 2
      spark.executor.memory: 2g
      spark.executor.instances: 2
    
    inputs:
      model:
        type: mlflow_model
        path: azureml:heart-classifier@latest
      input_data:
        type: uri_file
        path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv
        mode: direct
    
    outputs:
      scored_data:
        type: uri_folder
    
    args: >-
      --model ${{inputs.model}}
      --input_data ${{inputs.input_data}}
      --scored_data ${{outputs.scored_data}}
    
    identity:
      type: user_identity
    
    resources:
      instance_type: standard_e4s_v3
      runtime_version: "3.2"
    

    Совет

    Чтобы использовать присоединенный пул Synapse Spark, определите compute свойство в примере файла спецификации YAML, показанного resources выше вместо свойства.

  3. Приведенные выше файлы YAML можно использовать в az ml job create команде с --file параметром для создания автономного задания Spark, как показано ниже.

    az ml job create -f mlflow-score-spark-job.yml
    

Следующие шаги