Dela via


Självstudie: Poängsätta maskininlärningsmodeller med PREDICT i serverlösa Apache Spark-pooler

Lär dig hur du använder PREDICT-funktioner i serverlösa Apache Spark-pooler i Azure Synapse Analytics för poängförutsägelse. Du kan använda en tränad modell som är registrerad i Azure Machine Learning (AML) eller i standard-Azure Data Lake Storage (ADLS) i Synapse-arbetsytan.

PREDICT i en Synapse PySpark-notebook-fil ger dig möjlighet att poängsätta maskininlärningsmodeller med hjälp av SQL-språket, användardefinierade funktioner (UDF) eller Transformatorer. Med PREDICT kan du ta med dina befintliga maskininlärningsmodeller som tränats utanför Synapse och registrerats i Azure Data Lake Storage Gen2 eller Azure Machine Learning för att bedöma historiska data inom de säkra gränserna för Azure Synapse Analytics. Funktionen PREDICT tar en modell och data som indata. Den här funktionen eliminerar steget att flytta värdefulla data utanför Synapse för bedömning. Målet är att ge modellkonsumenter möjlighet att enkelt härleda maskininlärningsmodeller i Synapse och samarbeta sömlöst med modellproducenter som arbetar med rätt ramverk för sina uppgifter.

I den här självstudien får du lära dig att:

  • Förutsäga poäng för data i en serverlös Apache Spark-pool med hjälp av maskininlärningsmodeller som tränas utanför Synapse och registreras i Azure Machine Learning eller Azure Data Lake Storage Gen2.

Om du inte har någon Azure-prenumeration kan du skapa ett kostnadsfritt konto innan du börjar.

Förutsättningar

  • Azure Synapse Analytics-arbetsyta med ett Azure Data Lake Storage Gen2 lagringskonto konfigurerat som standardlagring. Du måste vara Storage Blob Data-deltagare för det Data Lake Storage Gen2 filsystem som du arbetar med.
  • Serverlös Apache Spark-pool i din Azure Synapse Analytics-arbetsyta. Mer information finns i Skapa en Spark-pool i Azure Synapse.
  • Azure Machine Learning-arbetsytan behövs om du vill träna eller registrera en modell i Azure Machine Learning. Mer information finns i Hantera Azure Machine Learning-arbetsytor i portalen eller med Python SDK.
  • Om din modell är registrerad i Azure Machine Learning behöver du en länkad tjänst. I Azure Synapse Analytics definierar en länkad tjänst din anslutningsinformation till tjänsten. I den här självstudien lägger du till en länkad tjänst för Azure Synapse Analytics och Azure Machine Learning. Mer information finns i Skapa en ny länkad Azure Machine Learning-tjänst i Synapse.
  • Funktionen PREDICT kräver att du redan har en tränad modell som antingen är registrerad i Azure Machine Learning eller laddas upp i Azure Data Lake Storage Gen2.

Anteckning

  • Funktionen PREDICT stöds i serverlös Apache Spark-pool i Spark3 i Azure Synapse Analytics. Python 3.8 rekommenderas för att skapa och träna modeller.
  • PREDICT stöder de flesta maskininlärningsmodellpaket i MLflow-format : TensorFlow, ONNX, PyTorch, SkLearn och pyfunc stöds i den här förhandsversionen.
  • PREDICT stöder AML- och ADLS-modellkälla . Här refererar ADLS-kontot till ADLS-standardkontot för Synapse-arbetsytan.

Logga in på Azure Portal

Logga in på Azure-portalen.

Använda PREDICT för MLFLOW-paketerade modeller

Kontrollera att alla krav är uppfyllda innan du följer de här stegen för att använda PREDICT.

  1. Importera bibliotek: Importera följande bibliotek för att använda PREDICT i Spark-sessionen.

    #Import libraries
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    from azureml.core import Workspace
    from azureml.core.authentication import ServicePrincipalAuthentication
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
  2. Ange parametrar med hjälp av variabler: Synapse ADLS-datasökvägen och modell-URI måste anges med hjälp av indatavariabler. Du måste också definiera körningen som är "mlflow" och datatypen för modellutdatareturen. Observera att alla datatyper som stöds i PySpark också stöds via PREDICT.

    Anteckning

    Innan du kör det här skriptet uppdaterar du det med URI:n för ADLS Gen2-datafilen tillsammans med modellutdatareturdatatypen och ADLS/AML-URI:n för modellfilen.

    #Set input data path
    DATA_FILE = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<file path>"
    
    #Set model URI
        #Set AML URI, if trained model is registered in AML
           AML_MODEL_URI = "<aml model uri>" #In URI ":x" signifies model version in AML. You can   choose which model version you want to run. If ":x" is not provided then by default   latest version will be picked.
    
        #Set ADLS URI, if trained model is uploaded in ADLS
           ADLS_MODEL_URI = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<model   mlflow folder path>"
    
    #Define model return type
    RETURN_TYPES = "<data_type>" # for ex: int, float etc. PySpark data types are supported
    
    #Define model runtime. This supports only mlflow
    RUNTIME = "mlflow"
    
  3. Sätt att autentisera AML-arbetsyta: Om modellen lagras i ADLS-standardkontot för Synapse-arbetsytan behöver du inte konfigurera ytterligare autentisering. Om modellen är registrerad i Azure Machine Learning kan du välja något av följande två sätt att autentisera.

    Anteckning

    Uppdatera klientorganisation, klient, prenumeration, resursgrupp, AML-arbetsyta och länkad tjänstinformation i det här skriptet innan du kör det.

    • Via tjänstens huvudnamn: Du kan använda klient-ID och hemlighet för tjänstens huvudnamn direkt för att autentisera till AML-arbetsytan. Tjänstens huvudnamn måste ha deltagaråtkomst till AML-arbetsytan.

      #AML workspace authentication using service principal
      AZURE_TENANT_ID = "<tenant_id>"
      AZURE_CLIENT_ID = "<client_id>"
      AZURE_CLIENT_SECRET = "<client_secret>"
      
      AML_SUBSCRIPTION_ID = "<subscription_id>"
      AML_RESOURCE_GROUP = "<resource_group_name>"
      AML_WORKSPACE_NAME = "<aml_workspace_name>"
      
      svc_pr = ServicePrincipalAuthentication( 
           tenant_id=AZURE_TENANT_ID,
           service_principal_id=AZURE_CLIENT_ID,
           service_principal_password=AZURE_CLIENT_SECRET
      )
      
      ws = Workspace(
           workspace_name = AML_WORKSPACE_NAME,
           subscription_id = AML_SUBSCRIPTION_ID,
           resource_group = AML_RESOURCE_GROUP,
           auth=svc_pr
      )
      
    • Via länkad tjänst: Du kan använda länkad tjänst för att autentisera till AML-arbetsytan. Länkad tjänst kan använda "tjänstens huvudnamn" eller Synapse-arbetsytans "Hanterad tjänstidentitet (MSI)" för autentisering. "Tjänstens huvudnamn" eller "Hanterad tjänstidentitet (MSI)" måste ha deltagaråtkomst till AML-arbetsytan.

      #AML workspace authentication using linked service
      from notebookutils.mssparkutils import azureML
      ws = azureML.getWorkspace("<linked_service_name>") #   "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked   service supports MSI and service principal both
      
  4. Aktivera PREDICT i Spark-session: Ställ in Spark-konfigurationen spark.synapse.ml.predict.enabledtrue för att aktivera biblioteket.

    #Enable SynapseML predict
    spark.conf.set("spark.synapse.ml.predict.enabled","true")
    
  5. Bindningsmodell i Spark-session: Binda modellen med nödvändiga indata så att modellen kan refereras i Spark-sessionen. Definiera även alias så att du kan använda samma alias i PREDICT-anropet.

    Anteckning

    Uppdatera modellalias och modell-URI i det här skriptet innan du kör det.

    #Bind model within Spark session
     model = pcontext.bind_model(
      return_types=RETURN_TYPES, 
      runtime=RUNTIME, 
      model_alias="<random_alias_name>", #This alias will be used in PREDICT call to refer  this   model
      model_uri=ADLS_MODEL_URI, #In case of AML, it will be AML_MODEL_URI
      aml_workspace=ws #This is only for AML. In case of ADLS, this parameter can be removed
      ).register()
    
  6. Läsa data från ADLS: Läsa data från ADLS. Skapa spark-dataram och en vy ovanpå dataramen.

    Anteckning

    Uppdatera visningsnamnet i det här skriptet innan du kör det.

    #Read data from ADLS
    df = spark.read \
     .format("csv") \
     .option("header", "true") \
     .csv(DATA_FILE,
         inferSchema=True)
    df.createOrReplaceTempView('<view_name>')
    
  7. Generera poäng med PREDICT: Du kan anropa PREDICT på tre sätt, använda Spark SQL API, använda UDF (User Define Function) och transformera API. Följande är exempel.

    Anteckning

    Uppdatera modellens aliasnamn, visningsnamn och kolumnnamnet för kommaavgränsade modellindata i det här skriptet innan du kör det. Indatakolumner för kommaavgränsade modeller är samma som de som används när modellen tränas.

    #Call PREDICT using Spark SQL API
    
    predictions = spark.sql(
                   """
                       SELECT PREDICT('<random_alias_name>',
                                 <comma_separated_model_input_column_name>) AS predict 
                       FROM <view_name>
                   """
               ).show()
    
    #Call PREDICT using user defined function (UDF)
    
    df = df[<comma_separated_model_input_column_name>] # for ex. df["empid","empname"]
    
    df.withColumn("PREDICT",model.udf(lit("<random_alias_name>"),*df.columns)).show()
    
    #Call PREDICT using Transformer API
    
    columns = [<comma_separated_model_input_column_name>] # for ex. df["empid","empname"]
    
    tranformer = model.create_transformer().setInputCols(columns).setOutputCol("PREDICT")
    
    tranformer.transform(df).show()
    

Sklearn-exempel med PREDICT

  1. Importera bibliotek och läs träningsdatauppsättningen från ADLS.

    # Import libraries and read training dataset from ADLS
    
    import fsspec
    import pandas
    from fsspec.core import split_protocol
    
    adls_account_name = 'xyz' #Provide exact ADLS account name
    adls_account_key = 'xyz' #Provide exact ADLS account key
    
    fsspec_handle = fsspec.open('abfs[s]://<container>/<path-to-file>',      account_name=adls_account_name, account_key=adls_account_key)
    
    with fsspec_handle.open() as f:
        train_df = pandas.read_csv(f)
    
  2. Träna modellen och generera mlflow-artefakter.

    # Train model and generate mlflow artifacts
    
    import os
    import shutil
    import mlflow
    import json
    from mlflow.utils import model_utils
    import numpy as np
    import pandas as pd
    from sklearn.linear_model import LinearRegression
    
    
    class LinearRegressionModel():
      _ARGS_FILENAME = 'args.json'
      FEATURES_KEY = 'features'
      TARGETS_KEY = 'targets'
      TARGETS_PRED_KEY = 'targets_pred'
    
      def __init__(self, fit_intercept, nb_input_features=9, nb_output_features=1):
        self.fit_intercept = fit_intercept
        self.nb_input_features = nb_input_features
        self.nb_output_features = nb_output_features
    
      def get_args(self):
        args = {
            'nb_input_features': self.nb_input_features,
            'nb_output_features': self.nb_output_features,
            'fit_intercept': self.fit_intercept
        }
        return args
    
      def create_model(self):
        self.model = LinearRegression(fit_intercept=self.fit_intercept)
    
      def train(self, dataset):
    
        features = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.FEATURES_KEY])], axis=0)
    
        targets = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.TARGETS_KEY])], axis=0)
    
    
        self.model.fit(features, targets)
    
      def predict(self, dataset):
        features = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.FEATURES_KEY])], axis=0)
        targets_pred = self.model.predict(features)
        return targets_pred
    
      def save(self, path):
        if os.path.exists(path):
          shutil.rmtree(path)
    
        # save the sklearn model with mlflow
        mlflow.sklearn.save_model(self.model, path)
    
        # save args
        self._save_args(path)
    
      def _save_args(self, path):
        args_filename = os.path.join(path, LinearRegressionModel._ARGS_FILENAME)
        with open(args_filename, 'w') as f:
          args = self.get_args()
          json.dump(args, f)
    
    
    def train(train_df, output_model_path):
      print(f"Start to train LinearRegressionModel.")
    
      # Initialize input dataset
      dataset = train_df.to_numpy()
      datasets = {}
      datasets['targets'] = dataset[:, -1]
      datasets['features'] = dataset[:, :9]
    
      # Initialize model class obj
      model_class = LinearRegressionModel(fit_intercept=10)
      with mlflow.start_run(nested=True) as run:
        model_class.create_model()
        model_class.train(datasets)
        model_class.save(output_model_path)
        print(model_class.predict(datasets))
    
    
    train(train_df, './artifacts/output')
    
  3. Lagra MLFLOW-artefakter för modell i ADLS eller registrera i AML.

    # Store model MLFLOW artifacts in ADLS
    
    STORAGE_PATH = 'abfs[s]://<container>/<path-to-store-folder>'
    
    protocol, _ = split_protocol(STORAGE_PATH)
    print (protocol)
    
    storage_options = {
        'account_name': adls_account_name,
        'account_key': adls_account_key
    }
    fs = fsspec.filesystem(protocol, **storage_options)
    fs.put(
        './artifacts/output',
        STORAGE_PATH, 
        recursive=True, overwrite=True)
    
    # Register model MLFLOW artifacts in AML
    
    from azureml.core import Workspace, Model
    from azureml.core.authentication import ServicePrincipalAuthentication
    
    AZURE_TENANT_ID = "xyz"
    AZURE_CLIENT_ID = "xyz"
    AZURE_CLIENT_SECRET = "xyz"
    
    AML_SUBSCRIPTION_ID = "xyz"
    AML_RESOURCE_GROUP = "xyz"
    AML_WORKSPACE_NAME = "xyz"
    
    svc_pr = ServicePrincipalAuthentication( 
        tenant_id=AZURE_TENANT_ID,
        service_principal_id=AZURE_CLIENT_ID,
        service_principal_password=AZURE_CLIENT_SECRET
    )
    
    ws = Workspace(
        workspace_name = AML_WORKSPACE_NAME,
        subscription_id = AML_SUBSCRIPTION_ID,
        resource_group = AML_RESOURCE_GROUP,
        auth=svc_pr
    )
    
    model = Model.register(
        model_path="./artifacts/output",
        model_name="xyz",
        workspace=ws,
    )
    
  4. Ange obligatoriska parametrar med hjälp av variabler.

    # If using ADLS uploaded model
    
    import pandas as pd
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
    DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv"
    ADLS_MODEL_URI_SKLEARN = "abfss://xyz@xyz.dfs.core.windows.net/mlflow/sklearn/     e2e_linear_regression/"
    RETURN_TYPES = "INT"
    RUNTIME = "mlflow"
    
    # If using AML registered model
    
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    from azureml.core import Workspace
    from azureml.core.authentication import ServicePrincipalAuthentication
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
    DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv"
    AML_MODEL_URI_SKLEARN = "aml://xyz"
    RETURN_TYPES = "INT"
    RUNTIME = "mlflow"
    
  5. Aktivera SynapseML PREDICT-funktioner i Spark-sessionen.

    spark.conf.set("spark.synapse.ml.predict.enabled","true")
    
  6. Bind modell i Spark-session.

    # If using ADLS uploaded model
    
    model = pcontext.bind_model(
     return_types=RETURN_TYPES, 
     runtime=RUNTIME, 
     model_alias="sklearn_linear_regression",
     model_uri=ADLS_MODEL_URI_SKLEARN,
     ).register()
    
    # If using AML registered model
    
    model = pcontext.bind_model(
     return_types=RETURN_TYPES, 
     runtime=RUNTIME, 
     model_alias="sklearn_linear_regression",
     model_uri=AML_MODEL_URI_SKLEARN,
     aml_workspace=ws
     ).register()
    
  7. Läs in testdata från ADLS.

    # Load data from ADLS
    
    df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .csv(DATA_FILE,
            inferSchema=True)
    df = df.select(df.columns[:9])
    df.createOrReplaceTempView('data')
    df.show(10)
    
  8. Anropa PREDICT för att generera poängen.

    # Call PREDICT
    
    predictions = spark.sql(
                      """
                          SELECT PREDICT('sklearn_linear_regression', *) AS predict FROM data
                      """
                  ).show()
    

Nästa steg