Condividi tramite


Esercitazione: Assegnare punteggi ai modelli di Machine Learning con PREDICT nei pool di Apache Spark serverless

Informazioni su come usare la funzionalità PREDICT nei pool di Apache Spark serverless in Azure Synapse Analytics per la stima del punteggio. È possibile usare un modello sottoposto a training registrato in Azure Machine Learning (AML) o nell'istanza predefinita di Azure Data Lake Storage (ADLS) nell'area di lavoro di Synapse.

PREDICT in un notebook PySpark di Synapse offre la possibilità di assegnare punteggi ai modelli di Machine Learning usando il linguaggio SQL, le funzioni definite dall'utente o i trasformatori. Con PREDICT è possibile portare i modelli di Machine Learning esistenti sottoposti a training all'esterno di Synapse e registrati in Azure Data Lake Storage Gen2 o Azure Machine Learning per assegnare punteggi ai dati cronologici entro i limiti sicuri di Azure Synapse Analytics. La funzione PREDICT accetta un modello e dati come input. Questa funzionalità elimina il passaggio dello spostamento di dati preziosi all'esterno di Synapse per l'assegnazione dei punteggi. L'obiettivo è consentire ai consumer di modelli di dedurre facilmente i modelli di Machine Learning in Synapse e collaborare senza problemi con i produttori di modelli che lavorano con il framework appropriato per l'attività.

Questa esercitazione illustra come:

  • Prevedere i punteggi per i dati in un pool di Apache Spark serverless usando modelli di Machine Learning sottoposti a training all'esterno di Synapse e registrati in Azure Machine Learning o Azure Data Lake Storage Gen2.

Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.

Prerequisiti

  • Area di lavoro di Azure Synapse Analytics con un account di archiviazione di Azure Data Lake Storage Gen2 configurato come risorsa archiviazione predefinita. È necessario essere il Collaboratore ai dati dei BLOB della risorsa di archiviazione del file system di Data Lake Storage Gen2 con cui si lavora.
  • Un pool di Apache Spark serverless nell'area di lavoro di Azure Synapse Analytics. Per i dettagli, vedere Creare un pool di Spark in Azure Synapse.
  • L'area di lavoro di Azure Machine Learning è necessaria se si vuole eseguire il training o registrare un modello in Azure Machine Learning. Gestire le aree di lavoro di Azure Machine Learning nel portale o con Python SDK.
  • Se il modello è registrato in Azure Machine Learning, è necessario un servizio collegato. In Azure Synapse Analytics si usano i servizi collegati per definire le informazioni di connessione ad altri servizi. In questa esercitazione si aggiungerà un servizio collegato Azure Synapse Analytics e Azure Machine Learning. Per altre informazioni, vedere Creare un nuovo servizio collegato di Azure Machine Learning in Synapse.
  • La funzionalità PREDICT richiede che sia già disponibile un modello sottoposto a training registrato in Azure Machine Learning o caricato in Azure Data Lake Storage Gen2.

Nota

  • La funzionalità PREDICT è supportata in un pool di Apache Spark serverless Spark3 in Azure Synapse Analytics. Python 3.8 è la versione consigliata per la creazione e il training del modello.
  • PREDICT supporta la maggior parte dei pacchetti di modelli di Machine Learning in formato MLflow: TensorFlow, ONNX, PyTorch, SkLearn e pyfunc sono supportati in questa anteprima.
  • PREDICT supporta origine del modello AML e ADLS. Qui l'account ADLS fa riferimento a account ADLS dell'area di lavoro Synapse predefinito.

Accedere al portale di Azure

Accedere al portale di Azure.

Usare PREDICT per i modelli in pacchetto MLFLOW

Assicurarsi che tutti i prerequisiti siano soddisfatti prima di seguire questi passaggi per l'uso di PREDICT.

  1. Importa librerie: Importare le librerie seguenti per usare PREDICT nella sessione Spark.

    #Import libraries
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    from azureml.core import Workspace
    from azureml.core.authentication import ServicePrincipalAuthentication
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
  2. Impostare i parametri usando le variabili: il percorso dati ADLS di Synapse e l'URI del modello devono essere impostati usando le variabili di input. È anche necessario definire il runtime che è "mlflow" e il tipo di dati restituito dall'output del modello. Si noti che tutti i tipi di dati supportati in PySpark sono supportati anche tramite PREDICT.

    Nota

    Prima di eseguire questo script, aggiornarlo con l'URI per il file di dati ADLS Gen2 insieme al tipo di dati restituito dall'output del modello e all'URI ADLS/AML per il file di modello.

    #Set input data path
    DATA_FILE = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<file path>"
    
    #Set model URI
        #Set AML URI, if trained model is registered in AML
           AML_MODEL_URI = "<aml model uri>" #In URI ":x" signifies model version in AML. You can   choose which model version you want to run. If ":x" is not provided then by default   latest version will be picked.
    
        #Set ADLS URI, if trained model is uploaded in ADLS
           ADLS_MODEL_URI = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<model   mlflow folder path>"
    
    #Define model return type
    RETURN_TYPES = "<data_type>" # for ex: int, float etc. PySpark data types are supported
    
    #Define model runtime. This supports only mlflow
    RUNTIME = "mlflow"
    
  3. Modi per autenticare l'area di lavoro AML: Se il modello è archiviato nell'account ADLS predefinito dell'area di lavoro Synapse, non è necessaria alcuna configurazione di autenticazione ulteriore. Se il modello è registrato in Azure Machine Learning, è possibile scegliere uno dei due modi seguenti supportati per l'autenticazione.

    Nota

    Aggiornare i dettagli del tenant, del client, della sottoscrizione, del gruppo di risorse, dell'area di lavoro AML e del servizio collegato in questo script prima di eseguirlo.

    • Tramite l'entità servizio: È possibile usare l'ID client dell'entità servizio e il segreto direttamente per l'autenticazione nell'area di lavoro AML. L'entità servizio deve avere accesso "Collaboratore" all'area di lavoro AML.

      #AML workspace authentication using service principal
      AZURE_TENANT_ID = "<tenant_id>"
      AZURE_CLIENT_ID = "<client_id>"
      AZURE_CLIENT_SECRET = "<client_secret>"
      
      AML_SUBSCRIPTION_ID = "<subscription_id>"
      AML_RESOURCE_GROUP = "<resource_group_name>"
      AML_WORKSPACE_NAME = "<aml_workspace_name>"
      
      svc_pr = ServicePrincipalAuthentication( 
           tenant_id=AZURE_TENANT_ID,
           service_principal_id=AZURE_CLIENT_ID,
           service_principal_password=AZURE_CLIENT_SECRET
      )
      
      ws = Workspace(
           workspace_name = AML_WORKSPACE_NAME,
           subscription_id = AML_SUBSCRIPTION_ID,
           resource_group = AML_RESOURCE_GROUP,
           auth=svc_pr
      )
      
    • Tramite il servizio collegato: È possibile usare il servizio collegato per l'autenticazione nell'area di lavoro AML. Il servizio collegato può usare "entità servizio" o l'area di lavoro di Synapse "Identità del servizio gestita (MSI)" per l'autenticazione. "Entità servizio" o "Identità del servizio gestita (MSI)" deve avere accesso "Collaboratore" all'area di lavoro AML.

      #AML workspace authentication using linked service
      from notebookutils.mssparkutils import azureML
      ws = azureML.getWorkspace("<linked_service_name>") #   "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked   service supports MSI and service principal both
      
  4. Abilitare PREDICT nella sessione Spark: Impostare la configurazione di Spark spark.synapse.ml.predict.enabled su true per abilitare la libreria.

    #Enable SynapseML predict
    spark.conf.set("spark.synapse.ml.predict.enabled","true")
    
  5. Modello Bind nella sessione Spark: Modello bind con input necessari in modo che il modello possa essere fatto riferimento nella sessione spark. Definire anche l'alias in modo che sia possibile usare lo stesso alias nella chiamata PREDICT.

    Nota

    Aggiornare l'alias del modello e l'URI del modello in questo script prima di eseguirlo.

    #Bind model within Spark session
     model = pcontext.bind_model(
      return_types=RETURN_TYPES, 
      runtime=RUNTIME, 
      model_alias="<random_alias_name>", #This alias will be used in PREDICT call to refer  this   model
      model_uri=ADLS_MODEL_URI, #In case of AML, it will be AML_MODEL_URI
      aml_workspace=ws #This is only for AML. In case of ADLS, this parameter can be removed
      ).register()
    
  6. Leggere i dati da ADLS: Leggere i dati da ADLS. Creare un dataframe spark e una vista sopra il frame di dati.

    Nota

    Aggiornare il nome della visualizzazione in questo script prima di eseguirlo.

    #Read data from ADLS
    df = spark.read \
     .format("csv") \
     .option("header", "true") \
     .csv(DATA_FILE,
         inferSchema=True)
    df.createOrReplaceTempView('<view_name>')
    
  7. Generare un punteggio usando PREDICT: È possibile chiamare PREDICT in tre modi, usando l'API SQL Spark, usando la funzione definita dall'utente e l'API Transformer. Alcuni esempi.

    Nota

    Aggiornare il nome dell'alias del modello, il nome della vista e il nome della colonna di input del modello delimitata da virgole in questo script prima di eseguirlo. Le colonne di input del modello delimitate da virgole corrispondono a quelle usate durante il training del modello.

    #Call PREDICT using Spark SQL API
    
    predictions = spark.sql(
                   """
                       SELECT PREDICT('<random_alias_name>',
                                 <comma_separated_model_input_column_name>) AS predict 
                       FROM <view_name>
                   """
               ).show()
    
    #Call PREDICT using user defined function (UDF)
    
    df = df[<comma_separated_model_input_column_name>] # for ex. df["empid","empname"]
    
    df.withColumn("PREDICT",model.udf(lit("<random_alias_name>"),*df.columns)).show()
    
    #Call PREDICT using Transformer API
    
    columns = [<comma_separated_model_input_column_name>] # for ex. df["empid","empname"]
    
    tranformer = model.create_transformer().setInputCols(columns).setOutputCol("PREDICT")
    
    tranformer.transform(df).show()
    

Esempio di sklearn con PREDICT

  1. Importare librerie e leggere il set di dati di training da ADLS.

    # Import libraries and read training dataset from ADLS
    
    import fsspec
    import pandas
    from fsspec.core import split_protocol
    
    adls_account_name = 'xyz' #Provide exact ADLS account name
    adls_account_key = 'xyz' #Provide exact ADLS account key
    
    fsspec_handle = fsspec.open('abfs[s]://<container>/<path-to-file>',      account_name=adls_account_name, account_key=adls_account_key)
    
    with fsspec_handle.open() as f:
        train_df = pandas.read_csv(f)
    
  2. Eseguire il training del modello e generare artefatti mlflow.

    # Train model and generate mlflow artifacts
    
    import os
    import shutil
    import mlflow
    import json
    from mlflow.utils import model_utils
    import numpy as np
    import pandas as pd
    from sklearn.linear_model import LinearRegression
    
    
    class LinearRegressionModel():
      _ARGS_FILENAME = 'args.json'
      FEATURES_KEY = 'features'
      TARGETS_KEY = 'targets'
      TARGETS_PRED_KEY = 'targets_pred'
    
      def __init__(self, fit_intercept, nb_input_features=9, nb_output_features=1):
        self.fit_intercept = fit_intercept
        self.nb_input_features = nb_input_features
        self.nb_output_features = nb_output_features
    
      def get_args(self):
        args = {
            'nb_input_features': self.nb_input_features,
            'nb_output_features': self.nb_output_features,
            'fit_intercept': self.fit_intercept
        }
        return args
    
      def create_model(self):
        self.model = LinearRegression(fit_intercept=self.fit_intercept)
    
      def train(self, dataset):
    
        features = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.FEATURES_KEY])], axis=0)
    
        targets = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.TARGETS_KEY])], axis=0)
    
    
        self.model.fit(features, targets)
    
      def predict(self, dataset):
        features = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.FEATURES_KEY])], axis=0)
        targets_pred = self.model.predict(features)
        return targets_pred
    
      def save(self, path):
        if os.path.exists(path):
          shutil.rmtree(path)
    
        # save the sklearn model with mlflow
        mlflow.sklearn.save_model(self.model, path)
    
        # save args
        self._save_args(path)
    
      def _save_args(self, path):
        args_filename = os.path.join(path, LinearRegressionModel._ARGS_FILENAME)
        with open(args_filename, 'w') as f:
          args = self.get_args()
          json.dump(args, f)
    
    
    def train(train_df, output_model_path):
      print(f"Start to train LinearRegressionModel.")
    
      # Initialize input dataset
      dataset = train_df.to_numpy()
      datasets = {}
      datasets['targets'] = dataset[:, -1]
      datasets['features'] = dataset[:, :9]
    
      # Initialize model class obj
      model_class = LinearRegressionModel(fit_intercept=10)
      with mlflow.start_run(nested=True) as run:
        model_class.create_model()
        model_class.train(datasets)
        model_class.save(output_model_path)
        print(model_class.predict(datasets))
    
    
    train(train_df, './artifacts/output')
    
  3. Archiviare gli artefatti MLFLOW del modello in ADLS o registrarli in AML.

    # Store model MLFLOW artifacts in ADLS
    
    STORAGE_PATH = 'abfs[s]://<container>/<path-to-store-folder>'
    
    protocol, _ = split_protocol(STORAGE_PATH)
    print (protocol)
    
    storage_options = {
        'account_name': adls_account_name,
        'account_key': adls_account_key
    }
    fs = fsspec.filesystem(protocol, **storage_options)
    fs.put(
        './artifacts/output',
        STORAGE_PATH, 
        recursive=True, overwrite=True)
    
    # Register model MLFLOW artifacts in AML
    
    from azureml.core import Workspace, Model
    from azureml.core.authentication import ServicePrincipalAuthentication
    
    AZURE_TENANT_ID = "xyz"
    AZURE_CLIENT_ID = "xyz"
    AZURE_CLIENT_SECRET = "xyz"
    
    AML_SUBSCRIPTION_ID = "xyz"
    AML_RESOURCE_GROUP = "xyz"
    AML_WORKSPACE_NAME = "xyz"
    
    svc_pr = ServicePrincipalAuthentication( 
        tenant_id=AZURE_TENANT_ID,
        service_principal_id=AZURE_CLIENT_ID,
        service_principal_password=AZURE_CLIENT_SECRET
    )
    
    ws = Workspace(
        workspace_name = AML_WORKSPACE_NAME,
        subscription_id = AML_SUBSCRIPTION_ID,
        resource_group = AML_RESOURCE_GROUP,
        auth=svc_pr
    )
    
    model = Model.register(
        model_path="./artifacts/output",
        model_name="xyz",
        workspace=ws,
    )
    
  4. Impostare i parametri obbligatori usando le variabili.

    # If using ADLS uploaded model
    
    import pandas as pd
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
    DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv"
    ADLS_MODEL_URI_SKLEARN = "abfss://xyz@xyz.dfs.core.windows.net/mlflow/sklearn/     e2e_linear_regression/"
    RETURN_TYPES = "INT"
    RUNTIME = "mlflow"
    
    # If using AML registered model
    
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    from azureml.core import Workspace
    from azureml.core.authentication import ServicePrincipalAuthentication
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
    DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv"
    AML_MODEL_URI_SKLEARN = "aml://xyz"
    RETURN_TYPES = "INT"
    RUNTIME = "mlflow"
    
  5. Abilitare la funzionalità PREDICT di SynapseML nella sessione Spark.

    spark.conf.set("spark.synapse.ml.predict.enabled","true")
    
  6. Associare il modello nella sessione spark.

    # If using ADLS uploaded model
    
    model = pcontext.bind_model(
     return_types=RETURN_TYPES, 
     runtime=RUNTIME, 
     model_alias="sklearn_linear_regression",
     model_uri=ADLS_MODEL_URI_SKLEARN,
     ).register()
    
    # If using AML registered model
    
    model = pcontext.bind_model(
     return_types=RETURN_TYPES, 
     runtime=RUNTIME, 
     model_alias="sklearn_linear_regression",
     model_uri=AML_MODEL_URI_SKLEARN,
     aml_workspace=ws
     ).register()
    
  7. Inserisci i dati di test da ADLS.

    # Load data from ADLS
    
    df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .csv(DATA_FILE,
            inferSchema=True)
    df = df.select(df.columns[:9])
    df.createOrReplaceTempView('data')
    df.show(10)
    
  8. Chiamare PREDICT per generare il punteggio.

    # Call PREDICT
    
    predictions = spark.sql(
                      """
                          SELECT PREDICT('sklearn_linear_regression', *) AS predict FROM data
                      """
                  ).show()
    

Passaggi successivi