Share via


Implantar e executar modelos do MLflow em trabalhos do Spark

Neste artigo, saiba como implantar e executar seu modelo do MLflow em trabalhos do Spark para executar inferência em grandes quantidades de dados ou como parte de trabalhos de estruturação de dados.

Sobre este exemplo

Este exemplo mostra como você pode implantar um modelo do MLflow registrado no Azure Machine Learning em trabalhos do Spark em execução em clusters do Spark gerenciados (versão prévia), no Azure Databricks ou no Azure Synapse Analytics para executar inferência em grandes quantidades de dados.

O modelo é baseado no conjunto de dados UCI Heart Disease. O banco de dados contém 76 atributos, mas estamos usando um subconjunto de 14 deles. O modelo tenta prever a presença de doenças cardíacas em um paciente. Ele é inteiro com valor 0 (sem presença) a 1 (presença). Ele foi treinado usando um classificador XGBBoost e todo o pré-processamento necessário foi empacotado como um pipeline scikit-learn, tornando esse modelo um pipeline de ponta a ponta que vai de dados brutos a previsões.

As informações deste artigo se baseiam em exemplos de código contidos no repositório azureml-examples. Para executar os comandos localmente sem precisar copiar/colar arquivos, clone o repositório e altere os diretórios para sdk/using-mlflow/deploy.

git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy

Pré-requisitos

Antes de seguir as etapas neste artigo, verifique se você tem os seguintes pré-requisitos:

  • Instale o pacote mlflow do SDK do MLflow e o plug-in do Azure Machine Learning para MLflow azureml-mlflow.

    pip install mlflow azureml-mlflow
    

    Dica

    Você pode usar o pacote mlflow-skinny, que é um pacote MLflow leve sem dependências de armazenamento, servidor, interface do usuário ou ciência de dados do SQL. mlflow-skinny é recomendado para usuários que precisam principalmente dos recursos de rastreamento e registro em log do MLflow sem importar o conjunto completo de recursos, incluindo implantações.

  • Um Workspace do Azure Machine Learning. Você pode criar um seguindo o tutorial Criar recursos de machine learning.

  • Se você estiver executando o acompanhamento remoto (ou seja, acompanhando experimentos que estão sendo executados fora do Azure Machine Learning), configure o MLflow para apontar para o URI de acompanhamento do workspace do Azure Machine Learning. Para obter mais informações sobre como conectar o MLflow ao seu workspace, consulte Configurar o MLflow para o Azure Machine Learning.

  • Você deve ter um modelo do MLflow registrado em seu workspace. Particularmente, este exemplo registrará um modelo treinado para o Conjunto de dados Diabetes.

Conectar-se ao workspace

Primeiro, vamos nos conectar ao workspace do Azure Machine Learning no qual o modelo está registrado.

O acompanhamento já está configurado para você. Suas credenciais padrão também serão usadas ao trabalhar com o MLflow.

Registrar o modelo

Precisamos de um modelo registrado no registro do Azure Machine Learning para executar a inferência. Nesse caso, já temos uma cópia local do modelo no repositório, portanto, só precisamos publicar o modelo no registro no workspace. Você poderá pular essa etapa se o modelo que está tentando implantar já estiver registrado.

model_name = 'heart-classifier'
model_local_path = "model"

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version

Como alternativa, se o modelo foi registrado dentro de uma execução, você pode registrá-lo diretamente.

Dica

Para registrar o modelo, você precisará saber onde ele foi armazenado. Se você estiver usando o recurso autolog do MLflow, o caminho dependerá do tipo e da estrutura do modelo que está sendo utilizado. É recomendável verificar a saída dos trabalhos para identificar qual é o nome dessa pasta. Você pode procurar a pasta que contém um arquivo nomeado MLModel. Se você estiver registrando os modelos manualmente usando log_model, o caminho será o argumento que você passará para esse método. Por exemplo, se você registrar o modelo usando mlflow.sklearn.log_model(my_model, "classifier"), o caminho em que o modelo está armazenado será classifier.

model_name = 'heart-classifier'

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version

Observação

O caminho MODEL_PATH é o local em que o modelo foi armazenado na execução.


Obter dados de entrada para pontuar

Precisaremos de alguns dados de entrada para execução dos trabalhos. Neste exemplo, baixaremos dados de exemplo da Internet e os colocaremos em um armazenamento compartilhado usado pelo cluster do Spark.

import urllib

urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")

Mova os dados para uma conta de armazenamento montada disponível para todo o cluster.

dbutils.fs.mv("file:/tmp/data", "dbfs:/")

Importante

O código anterior usa dbutils, que é uma ferramenta disponível no cluster do Azure Databricks. Use a ferramenta apropriada dependendo da plataforma que você está usando.

Em seguida, os dados de entrada são colocados na seguinte pasta:

input_data_path = "dbfs:/data"

Executar o modelo em clusters do Spark

A seção a seguir explica como executar modelos do MLflow registrados no Azure Machine Learning em trabalhos do Spark.

  1. Verifique se as seguintes bibliotecas estão instaladas no cluster:

    - mlflow<3,>=2.1
    - cloudpickle==2.2.0
    - scikit-learn==1.2.0
    - xgboost==1.7.2
    
  2. Usaremos um notebook para demonstrar como criar uma rotina de pontuação com um modelo do MLflow registrado no Azure Machine Learning. Crie um notebook e use PySpark como a linguagem padrão.

  3. Importe os namespaces necessários:

    import mlflow
    import pyspark.sql.functions as f
    
  4. Configure o URI do modelo. O URI a seguir traz um modelo chamado heart-classifier na versão mais recente.

    model_uri = "models:/heart-classifier/latest"
    
  5. Carregue o modelo como uma função UDF. Uma UDF (função definida pelo usuário) é uma função definida por um usuário, que permite que lógica personalizada seja reutilizada no ambiente do usuário.

    predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double') 
    

    Dica

    Use o argumento result_type para controlar o tipo retornado pela função predict().

  6. Leia os dados que deseja pontuar:

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
    

    Em nosso caso, os dados de entrada estão no formato CSV e foram colocados na pasta dbfs:/data/. Também estamos descartando a coluna target, pois esse conjunto de dados contém a variável de destino a ser prevista. Em cenários de produção, os dados não terão essa coluna.

  7. Execute a função predict_function e coloque as previsões em uma nova coluna. Nesse caso, estamos colocando as previsões na coluna predictions.

    df.withColumn("predictions", score_function(*df.columns))
    

    Dica

    O predict_function recebe como argumentos as colunas necessárias. Em nosso caso, todas as colunas do quadro de dados são esperadas pelo modelo e, portanto, df.columns é usado. Se o modelo exigir um subconjunto das colunas, você poderá apresentá-las manualmente. Se o modelo tiver uma assinatura, os tipos precisarão ser compatíveis entre entradas e tipos esperados.

  8. Você pode gravar as previsões de volta no armazenamento:

    scored_data_path = "dbfs:/scored-data"
    scored_data.to_csv(scored_data_path)
    

Executar o modelo em um trabalho autônomo do Spark no Azure Machine Learning

O Azure Machine Learning dá suporte à criação de um trabalho autônomo do Spark e à criação de um componente reutilizável do Spark que podem ser usados em pipelines do Azure Machine Learning. Neste exemplo, implantaremos um trabalho de pontuação que é executado no trabalho do Spark autônomo do Azure Machine Learning e executa um modelo do MLflow para executar a inferência.

Observação

Para saber mais sobre trabalhos do Spark no Azure Machine Learning, confira Enviar trabalhos do Spark no Azure Machine Learning (versão prévia).

  1. Um trabalho do Spark requer um script de Python que usa argumentos. Criar um script de pontuação

    score.py

    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--model")
    parser.add_argument("--input_data")
    parser.add_argument("--scored_data")
    
    args = parser.parse_args()
    print(args.model)
    print(args.input_data)
    
    # Load the model as an UDF function
    predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda")
    
    # Read the data you want to score
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target")
    
    # Run the function `predict_function` and place the predictions on a new column
    scored_data = df.withColumn("predictions", score_function(*df.columns))
    
    # Save the predictions
    scored_data.to_csv(args.scored_data)
    

    O script acima usa três argumentos; --model, --input_data e --scored_data. Os dois primeiros são entradas e representam o modelo que queremos executar e os dados de entrada, o último é uma saída e é a pasta de saída em que as previsões serão colocadas.

    Dica

    Instalação de pacotes de Python: o script de pontuação anterior carrega o modelo do MLflow em uma função UDF, mas indica o parâmetro env_manager="conda". Quando esse parâmetro for definido, o MLflow restaurará os pacotes necessários conforme especificado na definição do modelo em um ambiente isolado em que apenas a função UDF é executada. Para obter mais detalhes, confira a documentação mlflow.pyfunc.spark_udf.

  2. Criar definição de trabalho:

    mlflow-score-spark-job.yml

    $schema: http://azureml/sdk-2-0/SparkJob.json
    type: spark
    
    code: ./src
    entry:
      file: score.py
    
    conf:
      spark.driver.cores: 1
      spark.driver.memory: 2g
      spark.executor.cores: 2
      spark.executor.memory: 2g
      spark.executor.instances: 2
    
    inputs:
      model:
        type: mlflow_model
        path: azureml:heart-classifier@latest
      input_data:
        type: uri_file
        path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv
        mode: direct
    
    outputs:
      scored_data:
        type: uri_folder
    
    args: >-
      --model ${{inputs.model}}
      --input_data ${{inputs.input_data}}
      --scored_data ${{outputs.scored_data}}
    
    identity:
      type: user_identity
    
    resources:
      instance_type: standard_e4s_v3
      runtime_version: "3.2"
    

    Dica

    Para usar um pool do Spark do Synapse anexado, defina a propriedade compute no arquivo de especificação YAML de exemplo mostrado acima em vez da propriedade resources.

  3. Os arquivos YAML mostrados acima podem ser usados no comando az ml job create, com o parâmetro --file, para criar um trabalho autônomo do Spark, conforme mostrado:

    az ml job create -f mlflow-score-spark-job.yml
    

Próximas etapas