Esercitazione: Assegnare punteggi ai modelli di Machine Learning con PREDICT nei pool di Apache Spark serverless
Informazioni su come usare la funzionalità PREDICT nei pool di Apache Spark serverless in Azure Synapse Analytics per la stima del punteggio. È possibile usare un modello sottoposto a training registrato in Azure Machine Learning (AML) o nell'istanza predefinita di Azure Data Lake Storage (ADLS) nell'area di lavoro di Synapse.
PREDICT in un notebook PySpark di Synapse offre la possibilità di assegnare punteggi ai modelli di Machine Learning usando il linguaggio SQL, le funzioni definite dall'utente o i trasformatori. Con PREDICT è possibile portare i modelli di Machine Learning esistenti sottoposti a training all'esterno di Synapse e registrati in Azure Data Lake Storage Gen2 o Azure Machine Learning per assegnare punteggi ai dati cronologici entro i limiti sicuri di Azure Synapse Analytics. La funzione PREDICT accetta un modello e dati come input. Questa funzionalità elimina il passaggio dello spostamento di dati preziosi all'esterno di Synapse per l'assegnazione dei punteggi. L'obiettivo è consentire ai consumer di modelli di dedurre facilmente i modelli di Machine Learning in Synapse e collaborare senza problemi con i produttori di modelli che lavorano con il framework appropriato per l'attività.
Questa esercitazione illustra come:
- Prevedere i punteggi per i dati in un pool di Apache Spark serverless usando modelli di Machine Learning sottoposti a training all'esterno di Synapse e registrati in Azure Machine Learning o Azure Data Lake Storage Gen2.
Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.
Prerequisiti
- Area di lavoro di Azure Synapse Analytics con un account di archiviazione di Azure Data Lake Storage Gen2 configurato come risorsa archiviazione predefinita. È necessario essere il Collaboratore ai dati dei BLOB della risorsa di archiviazione del file system di Data Lake Storage Gen2 con cui si lavora.
- Un pool di Apache Spark serverless nell'area di lavoro di Azure Synapse Analytics. Per i dettagli, vedere Creare un pool di Spark in Azure Synapse.
- L'area di lavoro di Azure Machine Learning è necessaria se si vuole eseguire il training o registrare un modello in Azure Machine Learning. Gestire le aree di lavoro di Azure Machine Learning nel portale o con Python SDK.
- Se il modello è registrato in Azure Machine Learning, è necessario un servizio collegato. In Azure Synapse Analytics si usano i servizi collegati per definire le informazioni di connessione ad altri servizi. In questa esercitazione si aggiungerà un servizio collegato Azure Synapse Analytics e Azure Machine Learning. Per altre informazioni, vedere Creare un nuovo servizio collegato di Azure Machine Learning in Synapse.
- La funzionalità PREDICT richiede che sia già disponibile un modello sottoposto a training registrato in Azure Machine Learning o caricato in Azure Data Lake Storage Gen2.
Nota
- La funzionalità PREDICT è supportata in un pool di Apache Spark serverless Spark3 in Azure Synapse Analytics. Python 3.8 è la versione consigliata per la creazione e il training del modello.
- PREDICT supporta la maggior parte dei pacchetti di modelli di Machine Learning in formato MLflow: TensorFlow, ONNX, PyTorch, SkLearn e pyfunc sono supportati in questa anteprima.
- PREDICT supporta origine del modello AML e ADLS. Qui l'account ADLS fa riferimento a account ADLS dell'area di lavoro Synapse predefinito.
Accedere al portale di Azure
Accedere al portale di Azure.
Usare PREDICT per i modelli in pacchetto MLFLOW
Assicurarsi che tutti i prerequisiti siano soddisfatti prima di seguire questi passaggi per l'uso di PREDICT.
Importa librerie: Importare le librerie seguenti per usare PREDICT nella sessione Spark.
#Import libraries from pyspark.sql.functions import col, pandas_udf,udf,lit from azureml.core import Workspace from azureml.core.authentication import ServicePrincipalAuthentication import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
Impostare i parametri usando le variabili: il percorso dati ADLS di Synapse e l'URI del modello devono essere impostati usando le variabili di input. È anche necessario definire il runtime che è "mlflow" e il tipo di dati restituito dall'output del modello. Si noti che tutti i tipi di dati supportati in PySpark sono supportati anche tramite PREDICT.
Nota
Prima di eseguire questo script, aggiornarlo con l'URI per il file di dati ADLS Gen2 insieme al tipo di dati restituito dall'output del modello e all'URI ADLS/AML per il file di modello.
#Set input data path DATA_FILE = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<file path>" #Set model URI #Set AML URI, if trained model is registered in AML AML_MODEL_URI = "<aml model uri>" #In URI ":x" signifies model version in AML. You can choose which model version you want to run. If ":x" is not provided then by default latest version will be picked. #Set ADLS URI, if trained model is uploaded in ADLS ADLS_MODEL_URI = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<model mlflow folder path>" #Define model return type RETURN_TYPES = "<data_type>" # for ex: int, float etc. PySpark data types are supported #Define model runtime. This supports only mlflow RUNTIME = "mlflow"
Modi per autenticare l'area di lavoro AML: Se il modello è archiviato nell'account ADLS predefinito dell'area di lavoro Synapse, non è necessaria alcuna configurazione di autenticazione ulteriore. Se il modello è registrato in Azure Machine Learning, è possibile scegliere uno dei due modi seguenti supportati per l'autenticazione.
Nota
Aggiornare i dettagli del tenant, del client, della sottoscrizione, del gruppo di risorse, dell'area di lavoro AML e del servizio collegato in questo script prima di eseguirlo.
Tramite l'entità servizio: È possibile usare l'ID client dell'entità servizio e il segreto direttamente per l'autenticazione nell'area di lavoro AML. L'entità servizio deve avere accesso "Collaboratore" all'area di lavoro AML.
#AML workspace authentication using service principal AZURE_TENANT_ID = "<tenant_id>" AZURE_CLIENT_ID = "<client_id>" AZURE_CLIENT_SECRET = "<client_secret>" AML_SUBSCRIPTION_ID = "<subscription_id>" AML_RESOURCE_GROUP = "<resource_group_name>" AML_WORKSPACE_NAME = "<aml_workspace_name>" svc_pr = ServicePrincipalAuthentication( tenant_id=AZURE_TENANT_ID, service_principal_id=AZURE_CLIENT_ID, service_principal_password=AZURE_CLIENT_SECRET ) ws = Workspace( workspace_name = AML_WORKSPACE_NAME, subscription_id = AML_SUBSCRIPTION_ID, resource_group = AML_RESOURCE_GROUP, auth=svc_pr )
Tramite il servizio collegato: È possibile usare il servizio collegato per l'autenticazione nell'area di lavoro AML. Il servizio collegato può usare "entità servizio" o l'area di lavoro di Synapse "Identità del servizio gestita (MSI)" per l'autenticazione. "Entità servizio" o "Identità del servizio gestita (MSI)" deve avere accesso "Collaboratore" all'area di lavoro AML.
#AML workspace authentication using linked service from notebookutils.mssparkutils import azureML ws = azureML.getWorkspace("<linked_service_name>") # "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked service supports MSI and service principal both
Abilitare PREDICT nella sessione Spark: Impostare la configurazione di Spark
spark.synapse.ml.predict.enabled
sutrue
per abilitare la libreria.#Enable SynapseML predict spark.conf.set("spark.synapse.ml.predict.enabled","true")
Modello Bind nella sessione Spark: Modello bind con input necessari in modo che il modello possa essere fatto riferimento nella sessione spark. Definire anche l'alias in modo che sia possibile usare lo stesso alias nella chiamata PREDICT.
Nota
Aggiornare l'alias del modello e l'URI del modello in questo script prima di eseguirlo.
#Bind model within Spark session model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="<random_alias_name>", #This alias will be used in PREDICT call to refer this model model_uri=ADLS_MODEL_URI, #In case of AML, it will be AML_MODEL_URI aml_workspace=ws #This is only for AML. In case of ADLS, this parameter can be removed ).register()
Leggere i dati da ADLS: Leggere i dati da ADLS. Creare un dataframe spark e una vista sopra il frame di dati.
Nota
Aggiornare il nome della visualizzazione in questo script prima di eseguirlo.
#Read data from ADLS df = spark.read \ .format("csv") \ .option("header", "true") \ .csv(DATA_FILE, inferSchema=True) df.createOrReplaceTempView('<view_name>')
Generare un punteggio usando PREDICT: È possibile chiamare PREDICT in tre modi, usando l'API SQL Spark, usando la funzione definita dall'utente e l'API Transformer. Alcuni esempi.
Nota
Aggiornare il nome dell'alias del modello, il nome della vista e il nome della colonna di input del modello delimitata da virgole in questo script prima di eseguirlo. Le colonne di input del modello delimitate da virgole corrispondono a quelle usate durante il training del modello.
#Call PREDICT using Spark SQL API predictions = spark.sql( """ SELECT PREDICT('<random_alias_name>', <comma_separated_model_input_column_name>) AS predict FROM <view_name> """ ).show()
#Call PREDICT using user defined function (UDF) df = df[<comma_separated_model_input_column_name>] # for ex. df["empid","empname"] df.withColumn("PREDICT",model.udf(lit("<random_alias_name>"),*df.columns)).show()
#Call PREDICT using Transformer API columns = [<comma_separated_model_input_column_name>] # for ex. df["empid","empname"] tranformer = model.create_transformer().setInputCols(columns).setOutputCol("PREDICT") tranformer.transform(df).show()
Esempio di sklearn con PREDICT
Importare librerie e leggere il set di dati di training da ADLS.
# Import libraries and read training dataset from ADLS import fsspec import pandas from fsspec.core import split_protocol adls_account_name = 'xyz' #Provide exact ADLS account name adls_account_key = 'xyz' #Provide exact ADLS account key fsspec_handle = fsspec.open('abfs[s]://<container>/<path-to-file>', account_name=adls_account_name, account_key=adls_account_key) with fsspec_handle.open() as f: train_df = pandas.read_csv(f)
Eseguire il training del modello e generare artefatti mlflow.
# Train model and generate mlflow artifacts import os import shutil import mlflow import json from mlflow.utils import model_utils import numpy as np import pandas as pd from sklearn.linear_model import LinearRegression class LinearRegressionModel(): _ARGS_FILENAME = 'args.json' FEATURES_KEY = 'features' TARGETS_KEY = 'targets' TARGETS_PRED_KEY = 'targets_pred' def __init__(self, fit_intercept, nb_input_features=9, nb_output_features=1): self.fit_intercept = fit_intercept self.nb_input_features = nb_input_features self.nb_output_features = nb_output_features def get_args(self): args = { 'nb_input_features': self.nb_input_features, 'nb_output_features': self.nb_output_features, 'fit_intercept': self.fit_intercept } return args def create_model(self): self.model = LinearRegression(fit_intercept=self.fit_intercept) def train(self, dataset): features = np.stack([sample for sample in iter( dataset[LinearRegressionModel.FEATURES_KEY])], axis=0) targets = np.stack([sample for sample in iter( dataset[LinearRegressionModel.TARGETS_KEY])], axis=0) self.model.fit(features, targets) def predict(self, dataset): features = np.stack([sample for sample in iter( dataset[LinearRegressionModel.FEATURES_KEY])], axis=0) targets_pred = self.model.predict(features) return targets_pred def save(self, path): if os.path.exists(path): shutil.rmtree(path) # save the sklearn model with mlflow mlflow.sklearn.save_model(self.model, path) # save args self._save_args(path) def _save_args(self, path): args_filename = os.path.join(path, LinearRegressionModel._ARGS_FILENAME) with open(args_filename, 'w') as f: args = self.get_args() json.dump(args, f) def train(train_df, output_model_path): print(f"Start to train LinearRegressionModel.") # Initialize input dataset dataset = train_df.to_numpy() datasets = {} datasets['targets'] = dataset[:, -1] datasets['features'] = dataset[:, :9] # Initialize model class obj model_class = LinearRegressionModel(fit_intercept=10) with mlflow.start_run(nested=True) as run: model_class.create_model() model_class.train(datasets) model_class.save(output_model_path) print(model_class.predict(datasets)) train(train_df, './artifacts/output')
Archiviare gli artefatti MLFLOW del modello in ADLS o registrarli in AML.
# Store model MLFLOW artifacts in ADLS STORAGE_PATH = 'abfs[s]://<container>/<path-to-store-folder>' protocol, _ = split_protocol(STORAGE_PATH) print (protocol) storage_options = { 'account_name': adls_account_name, 'account_key': adls_account_key } fs = fsspec.filesystem(protocol, **storage_options) fs.put( './artifacts/output', STORAGE_PATH, recursive=True, overwrite=True)
# Register model MLFLOW artifacts in AML from azureml.core import Workspace, Model from azureml.core.authentication import ServicePrincipalAuthentication AZURE_TENANT_ID = "xyz" AZURE_CLIENT_ID = "xyz" AZURE_CLIENT_SECRET = "xyz" AML_SUBSCRIPTION_ID = "xyz" AML_RESOURCE_GROUP = "xyz" AML_WORKSPACE_NAME = "xyz" svc_pr = ServicePrincipalAuthentication( tenant_id=AZURE_TENANT_ID, service_principal_id=AZURE_CLIENT_ID, service_principal_password=AZURE_CLIENT_SECRET ) ws = Workspace( workspace_name = AML_WORKSPACE_NAME, subscription_id = AML_SUBSCRIPTION_ID, resource_group = AML_RESOURCE_GROUP, auth=svc_pr ) model = Model.register( model_path="./artifacts/output", model_name="xyz", workspace=ws, )
Impostare i parametri obbligatori usando le variabili.
# If using ADLS uploaded model import pandas as pd from pyspark.sql import SparkSession from pyspark.sql.functions import col, pandas_udf,udf,lit import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_logger DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv" ADLS_MODEL_URI_SKLEARN = "abfss://xyz@xyz.dfs.core.windows.net/mlflow/sklearn/ e2e_linear_regression/" RETURN_TYPES = "INT" RUNTIME = "mlflow"
# If using AML registered model from pyspark.sql.functions import col, pandas_udf,udf,lit from azureml.core import Workspace from azureml.core.authentication import ServicePrincipalAuthentication import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_logger DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv" AML_MODEL_URI_SKLEARN = "aml://xyz" RETURN_TYPES = "INT" RUNTIME = "mlflow"
Abilitare la funzionalità PREDICT di SynapseML nella sessione Spark.
spark.conf.set("spark.synapse.ml.predict.enabled","true")
Associare il modello nella sessione spark.
# If using ADLS uploaded model model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="sklearn_linear_regression", model_uri=ADLS_MODEL_URI_SKLEARN, ).register()
# If using AML registered model model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="sklearn_linear_regression", model_uri=AML_MODEL_URI_SKLEARN, aml_workspace=ws ).register()
Inserisci i dati di test da ADLS.
# Load data from ADLS df = spark.read \ .format("csv") \ .option("header", "true") \ .csv(DATA_FILE, inferSchema=True) df = df.select(df.columns[:9]) df.createOrReplaceTempView('data') df.show(10)
Chiamare PREDICT per generare il punteggio.
# Call PREDICT predictions = spark.sql( """ SELECT PREDICT('sklearn_linear_regression', *) AS predict FROM data """ ).show()