Implantar e executar modelos do MLflow em trabalhos do Spark
Neste artigo, saiba como implantar e executar seu modelo do MLflow em trabalhos do Spark para executar inferência em grandes quantidades de dados ou como parte de trabalhos de estruturação de dados.
Sobre este exemplo
Este exemplo mostra como você pode implantar um modelo do MLflow registrado no Azure Machine Learning em trabalhos do Spark em execução em clusters do Spark gerenciados (versão prévia), no Azure Databricks ou no Azure Synapse Analytics para executar inferência em grandes quantidades de dados.
O modelo é baseado no conjunto de dados UCI Heart Disease. O banco de dados contém 76 atributos, mas estamos usando um subconjunto de 14 deles. O modelo tenta prever a presença de doenças cardíacas em um paciente. Ele é inteiro com valor 0 (sem presença) a 1 (presença). Ele foi treinado usando um classificador XGBBoost
e todo o pré-processamento necessário foi empacotado como um pipeline scikit-learn
, tornando esse modelo um pipeline de ponta a ponta que vai de dados brutos a previsões.
As informações deste artigo se baseiam em exemplos de código contidos no repositório azureml-examples. Para executar os comandos localmente sem precisar copiar/colar arquivos, clone o repositório e altere os diretórios para sdk/using-mlflow/deploy
.
git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy
Pré-requisitos
Antes de seguir as etapas neste artigo, verifique se você tem os seguintes pré-requisitos:
Instale o pacote
mlflow
do SDK do MLflow e o plug-in do Azure Machine Learning para MLflowazureml-mlflow
.pip install mlflow azureml-mlflow
Dica
Você pode usar o pacote
mlflow-skinny
, que é um pacote MLflow leve sem dependências de armazenamento, servidor, interface do usuário ou ciência de dados do SQL.mlflow-skinny
é recomendado para usuários que precisam principalmente dos recursos de rastreamento e registro em log do MLflow sem importar o conjunto completo de recursos, incluindo implantações.Um Workspace do Azure Machine Learning. Você pode criar um seguindo o tutorial Criar recursos de machine learning.
Se você estiver executando o acompanhamento remoto (ou seja, acompanhando experimentos que estão sendo executados fora do Azure Machine Learning), configure o MLflow para apontar para o URI de acompanhamento do workspace do Azure Machine Learning. Para obter mais informações sobre como conectar o MLflow ao seu workspace, consulte Configurar o MLflow para o Azure Machine Learning.
- Você deve ter um modelo do MLflow registrado em seu workspace. Particularmente, este exemplo registrará um modelo treinado para o Conjunto de dados Diabetes.
Conectar-se ao workspace
Primeiro, vamos nos conectar ao workspace do Azure Machine Learning no qual o modelo está registrado.
O acompanhamento já está configurado para você. Suas credenciais padrão também serão usadas ao trabalhar com o MLflow.
Registrar o modelo
Precisamos de um modelo registrado no registro do Azure Machine Learning para executar a inferência. Nesse caso, já temos uma cópia local do modelo no repositório, portanto, só precisamos publicar o modelo no registro no workspace. Você poderá pular essa etapa se o modelo que está tentando implantar já estiver registrado.
model_name = 'heart-classifier'
model_local_path = "model"
registered_model = mlflow_client.create_model_version(
name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version
Como alternativa, se o modelo foi registrado dentro de uma execução, você pode registrá-lo diretamente.
Dica
Para registrar o modelo, você precisará saber onde ele foi armazenado. Se você estiver usando o recurso autolog
do MLflow, o caminho dependerá do tipo e da estrutura do modelo que está sendo utilizado. É recomendável verificar a saída dos trabalhos para identificar qual é o nome dessa pasta. Você pode procurar a pasta que contém um arquivo nomeado MLModel
. Se você estiver registrando os modelos manualmente usando log_model
, o caminho será o argumento que você passará para esse método. Por exemplo, se você registrar o modelo usando mlflow.sklearn.log_model(my_model, "classifier")
, o caminho em que o modelo está armazenado será classifier
.
model_name = 'heart-classifier'
registered_model = mlflow_client.create_model_version(
name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version
Observação
O caminho MODEL_PATH
é o local em que o modelo foi armazenado na execução.
Obter dados de entrada para pontuar
Precisaremos de alguns dados de entrada para execução dos trabalhos. Neste exemplo, baixaremos dados de exemplo da Internet e os colocaremos em um armazenamento compartilhado usado pelo cluster do Spark.
import urllib
urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")
Mova os dados para uma conta de armazenamento montada disponível para todo o cluster.
dbutils.fs.mv("file:/tmp/data", "dbfs:/")
Importante
O código anterior usa dbutils
, que é uma ferramenta disponível no cluster do Azure Databricks. Use a ferramenta apropriada dependendo da plataforma que você está usando.
Em seguida, os dados de entrada são colocados na seguinte pasta:
input_data_path = "dbfs:/data"
Executar o modelo em clusters do Spark
A seção a seguir explica como executar modelos do MLflow registrados no Azure Machine Learning em trabalhos do Spark.
Verifique se as seguintes bibliotecas estão instaladas no cluster:
- mlflow<3,>=2.1 - cloudpickle==2.2.0 - scikit-learn==1.2.0 - xgboost==1.7.2
Usaremos um notebook para demonstrar como criar uma rotina de pontuação com um modelo do MLflow registrado no Azure Machine Learning. Crie um notebook e use PySpark como a linguagem padrão.
Importe os namespaces necessários:
import mlflow import pyspark.sql.functions as f
Configure o URI do modelo. O URI a seguir traz um modelo chamado
heart-classifier
na versão mais recente.model_uri = "models:/heart-classifier/latest"
Carregue o modelo como uma função UDF. Uma UDF (função definida pelo usuário) é uma função definida por um usuário, que permite que lógica personalizada seja reutilizada no ambiente do usuário.
predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double')
Dica
Use o argumento
result_type
para controlar o tipo retornado pela funçãopredict()
.Leia os dados que deseja pontuar:
df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
Em nosso caso, os dados de entrada estão no formato
CSV
e foram colocados na pastadbfs:/data/
. Também estamos descartando a colunatarget
, pois esse conjunto de dados contém a variável de destino a ser prevista. Em cenários de produção, os dados não terão essa coluna.Execute a função
predict_function
e coloque as previsões em uma nova coluna. Nesse caso, estamos colocando as previsões na colunapredictions
.df.withColumn("predictions", score_function(*df.columns))
Dica
O
predict_function
recebe como argumentos as colunas necessárias. Em nosso caso, todas as colunas do quadro de dados são esperadas pelo modelo e, portanto,df.columns
é usado. Se o modelo exigir um subconjunto das colunas, você poderá apresentá-las manualmente. Se o modelo tiver uma assinatura, os tipos precisarão ser compatíveis entre entradas e tipos esperados.Você pode gravar as previsões de volta no armazenamento:
scored_data_path = "dbfs:/scored-data" scored_data.to_csv(scored_data_path)
Executar o modelo em um trabalho autônomo do Spark no Azure Machine Learning
O Azure Machine Learning dá suporte à criação de um trabalho autônomo do Spark e à criação de um componente reutilizável do Spark que podem ser usados em pipelines do Azure Machine Learning. Neste exemplo, implantaremos um trabalho de pontuação que é executado no trabalho do Spark autônomo do Azure Machine Learning e executa um modelo do MLflow para executar a inferência.
Observação
Para saber mais sobre trabalhos do Spark no Azure Machine Learning, confira Enviar trabalhos do Spark no Azure Machine Learning (versão prévia).
Um trabalho do Spark requer um script de Python que usa argumentos. Criar um script de pontuação
score.py
import argparse parser = argparse.ArgumentParser() parser.add_argument("--model") parser.add_argument("--input_data") parser.add_argument("--scored_data") args = parser.parse_args() print(args.model) print(args.input_data) # Load the model as an UDF function predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda") # Read the data you want to score df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target") # Run the function `predict_function` and place the predictions on a new column scored_data = df.withColumn("predictions", score_function(*df.columns)) # Save the predictions scored_data.to_csv(args.scored_data)
O script acima usa três argumentos;
--model
,--input_data
e--scored_data
. Os dois primeiros são entradas e representam o modelo que queremos executar e os dados de entrada, o último é uma saída e é a pasta de saída em que as previsões serão colocadas.Dica
Instalação de pacotes de Python: o script de pontuação anterior carrega o modelo do MLflow em uma função UDF, mas indica o parâmetro
env_manager="conda"
. Quando esse parâmetro for definido, o MLflow restaurará os pacotes necessários conforme especificado na definição do modelo em um ambiente isolado em que apenas a função UDF é executada. Para obter mais detalhes, confira a documentaçãomlflow.pyfunc.spark_udf
.Criar definição de trabalho:
mlflow-score-spark-job.yml
$schema: http://azureml/sdk-2-0/SparkJob.json type: spark code: ./src entry: file: score.py conf: spark.driver.cores: 1 spark.driver.memory: 2g spark.executor.cores: 2 spark.executor.memory: 2g spark.executor.instances: 2 inputs: model: type: mlflow_model path: azureml:heart-classifier@latest input_data: type: uri_file path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv mode: direct outputs: scored_data: type: uri_folder args: >- --model ${{inputs.model}} --input_data ${{inputs.input_data}} --scored_data ${{outputs.scored_data}} identity: type: user_identity resources: instance_type: standard_e4s_v3 runtime_version: "3.2"
Dica
Para usar um pool do Spark do Synapse anexado, defina a propriedade
compute
no arquivo de especificação YAML de exemplo mostrado acima em vez da propriedaderesources
.Os arquivos YAML mostrados acima podem ser usados no comando
az ml job create
, com o parâmetro--file
, para criar um trabalho autônomo do Spark, conforme mostrado:az ml job create -f mlflow-score-spark-job.yml