Självstudie: Poängsätta maskininlärningsmodeller med PREDICT i serverlösa Apache Spark-pooler
Lär dig hur du använder PREDICT-funktioner i serverlösa Apache Spark-pooler i Azure Synapse Analytics för poängförutsägelse. Du kan använda en tränad modell som är registrerad i Azure Machine Learning (AML) eller i standard-Azure Data Lake Storage (ADLS) i Synapse-arbetsytan.
PREDICT i en Synapse PySpark-notebook-fil ger dig möjlighet att poängsätta maskininlärningsmodeller med hjälp av SQL-språket, användardefinierade funktioner (UDF) eller Transformatorer. Med PREDICT kan du ta med dina befintliga maskininlärningsmodeller som tränats utanför Synapse och registrerats i Azure Data Lake Storage Gen2 eller Azure Machine Learning för att bedöma historiska data inom de säkra gränserna för Azure Synapse Analytics. Funktionen PREDICT tar en modell och data som indata. Den här funktionen eliminerar steget att flytta värdefulla data utanför Synapse för bedömning. Målet är att ge modellkonsumenter möjlighet att enkelt härleda maskininlärningsmodeller i Synapse och samarbeta sömlöst med modellproducenter som arbetar med rätt ramverk för sina uppgifter.
I den här självstudien får du lära dig att:
- Förutsäga poäng för data i en serverlös Apache Spark-pool med hjälp av maskininlärningsmodeller som tränas utanför Synapse och registreras i Azure Machine Learning eller Azure Data Lake Storage Gen2.
Om du inte har någon Azure-prenumeration kan du skapa ett kostnadsfritt konto innan du börjar.
Förutsättningar
- Azure Synapse Analytics-arbetsyta med ett Azure Data Lake Storage Gen2 lagringskonto konfigurerat som standardlagring. Du måste vara Storage Blob Data-deltagare för det Data Lake Storage Gen2 filsystem som du arbetar med.
- Serverlös Apache Spark-pool i din Azure Synapse Analytics-arbetsyta. Mer information finns i Skapa en Spark-pool i Azure Synapse.
- Azure Machine Learning-arbetsytan behövs om du vill träna eller registrera en modell i Azure Machine Learning. Mer information finns i Hantera Azure Machine Learning-arbetsytor i portalen eller med Python SDK.
- Om din modell är registrerad i Azure Machine Learning behöver du en länkad tjänst. I Azure Synapse Analytics definierar en länkad tjänst din anslutningsinformation till tjänsten. I den här självstudien lägger du till en länkad tjänst för Azure Synapse Analytics och Azure Machine Learning. Mer information finns i Skapa en ny länkad Azure Machine Learning-tjänst i Synapse.
- Funktionen PREDICT kräver att du redan har en tränad modell som antingen är registrerad i Azure Machine Learning eller laddas upp i Azure Data Lake Storage Gen2.
Anteckning
- Funktionen PREDICT stöds i serverlös Apache Spark-pool i Spark3 i Azure Synapse Analytics. Python 3.8 rekommenderas för att skapa och träna modeller.
- PREDICT stöder de flesta maskininlärningsmodellpaket i MLflow-format : TensorFlow, ONNX, PyTorch, SkLearn och pyfunc stöds i den här förhandsversionen.
- PREDICT stöder AML- och ADLS-modellkälla . Här refererar ADLS-kontot till ADLS-standardkontot för Synapse-arbetsytan.
Logga in på Azure Portal
Logga in på Azure-portalen.
Använda PREDICT för MLFLOW-paketerade modeller
Kontrollera att alla krav är uppfyllda innan du följer de här stegen för att använda PREDICT.
Importera bibliotek: Importera följande bibliotek för att använda PREDICT i Spark-sessionen.
#Import libraries from pyspark.sql.functions import col, pandas_udf,udf,lit from azureml.core import Workspace from azureml.core.authentication import ServicePrincipalAuthentication import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
Ange parametrar med hjälp av variabler: Synapse ADLS-datasökvägen och modell-URI måste anges med hjälp av indatavariabler. Du måste också definiera körningen som är "mlflow" och datatypen för modellutdatareturen. Observera att alla datatyper som stöds i PySpark också stöds via PREDICT.
Anteckning
Innan du kör det här skriptet uppdaterar du det med URI:n för ADLS Gen2-datafilen tillsammans med modellutdatareturdatatypen och ADLS/AML-URI:n för modellfilen.
#Set input data path DATA_FILE = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<file path>" #Set model URI #Set AML URI, if trained model is registered in AML AML_MODEL_URI = "<aml model uri>" #In URI ":x" signifies model version in AML. You can choose which model version you want to run. If ":x" is not provided then by default latest version will be picked. #Set ADLS URI, if trained model is uploaded in ADLS ADLS_MODEL_URI = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<model mlflow folder path>" #Define model return type RETURN_TYPES = "<data_type>" # for ex: int, float etc. PySpark data types are supported #Define model runtime. This supports only mlflow RUNTIME = "mlflow"
Sätt att autentisera AML-arbetsyta: Om modellen lagras i ADLS-standardkontot för Synapse-arbetsytan behöver du inte konfigurera ytterligare autentisering. Om modellen är registrerad i Azure Machine Learning kan du välja något av följande två sätt att autentisera.
Anteckning
Uppdatera klientorganisation, klient, prenumeration, resursgrupp, AML-arbetsyta och länkad tjänstinformation i det här skriptet innan du kör det.
Via tjänstens huvudnamn: Du kan använda klient-ID och hemlighet för tjänstens huvudnamn direkt för att autentisera till AML-arbetsytan. Tjänstens huvudnamn måste ha deltagaråtkomst till AML-arbetsytan.
#AML workspace authentication using service principal AZURE_TENANT_ID = "<tenant_id>" AZURE_CLIENT_ID = "<client_id>" AZURE_CLIENT_SECRET = "<client_secret>" AML_SUBSCRIPTION_ID = "<subscription_id>" AML_RESOURCE_GROUP = "<resource_group_name>" AML_WORKSPACE_NAME = "<aml_workspace_name>" svc_pr = ServicePrincipalAuthentication( tenant_id=AZURE_TENANT_ID, service_principal_id=AZURE_CLIENT_ID, service_principal_password=AZURE_CLIENT_SECRET ) ws = Workspace( workspace_name = AML_WORKSPACE_NAME, subscription_id = AML_SUBSCRIPTION_ID, resource_group = AML_RESOURCE_GROUP, auth=svc_pr )
Via länkad tjänst: Du kan använda länkad tjänst för att autentisera till AML-arbetsytan. Länkad tjänst kan använda "tjänstens huvudnamn" eller Synapse-arbetsytans "Hanterad tjänstidentitet (MSI)" för autentisering. "Tjänstens huvudnamn" eller "Hanterad tjänstidentitet (MSI)" måste ha deltagaråtkomst till AML-arbetsytan.
#AML workspace authentication using linked service from notebookutils.mssparkutils import azureML ws = azureML.getWorkspace("<linked_service_name>") # "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked service supports MSI and service principal both
Aktivera PREDICT i Spark-session: Ställ in Spark-konfigurationen
spark.synapse.ml.predict.enabled
påtrue
för att aktivera biblioteket.#Enable SynapseML predict spark.conf.set("spark.synapse.ml.predict.enabled","true")
Bindningsmodell i Spark-session: Binda modellen med nödvändiga indata så att modellen kan refereras i Spark-sessionen. Definiera även alias så att du kan använda samma alias i PREDICT-anropet.
Anteckning
Uppdatera modellalias och modell-URI i det här skriptet innan du kör det.
#Bind model within Spark session model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="<random_alias_name>", #This alias will be used in PREDICT call to refer this model model_uri=ADLS_MODEL_URI, #In case of AML, it will be AML_MODEL_URI aml_workspace=ws #This is only for AML. In case of ADLS, this parameter can be removed ).register()
Läsa data från ADLS: Läsa data från ADLS. Skapa spark-dataram och en vy ovanpå dataramen.
Anteckning
Uppdatera visningsnamnet i det här skriptet innan du kör det.
#Read data from ADLS df = spark.read \ .format("csv") \ .option("header", "true") \ .csv(DATA_FILE, inferSchema=True) df.createOrReplaceTempView('<view_name>')
Generera poäng med PREDICT: Du kan anropa PREDICT på tre sätt, använda Spark SQL API, använda UDF (User Define Function) och transformera API. Följande är exempel.
Anteckning
Uppdatera modellens aliasnamn, visningsnamn och kolumnnamnet för kommaavgränsade modellindata i det här skriptet innan du kör det. Indatakolumner för kommaavgränsade modeller är samma som de som används när modellen tränas.
#Call PREDICT using Spark SQL API predictions = spark.sql( """ SELECT PREDICT('<random_alias_name>', <comma_separated_model_input_column_name>) AS predict FROM <view_name> """ ).show()
#Call PREDICT using user defined function (UDF) df = df[<comma_separated_model_input_column_name>] # for ex. df["empid","empname"] df.withColumn("PREDICT",model.udf(lit("<random_alias_name>"),*df.columns)).show()
#Call PREDICT using Transformer API columns = [<comma_separated_model_input_column_name>] # for ex. df["empid","empname"] tranformer = model.create_transformer().setInputCols(columns).setOutputCol("PREDICT") tranformer.transform(df).show()
Sklearn-exempel med PREDICT
Importera bibliotek och läs träningsdatauppsättningen från ADLS.
# Import libraries and read training dataset from ADLS import fsspec import pandas from fsspec.core import split_protocol adls_account_name = 'xyz' #Provide exact ADLS account name adls_account_key = 'xyz' #Provide exact ADLS account key fsspec_handle = fsspec.open('abfs[s]://<container>/<path-to-file>', account_name=adls_account_name, account_key=adls_account_key) with fsspec_handle.open() as f: train_df = pandas.read_csv(f)
Träna modellen och generera mlflow-artefakter.
# Train model and generate mlflow artifacts import os import shutil import mlflow import json from mlflow.utils import model_utils import numpy as np import pandas as pd from sklearn.linear_model import LinearRegression class LinearRegressionModel(): _ARGS_FILENAME = 'args.json' FEATURES_KEY = 'features' TARGETS_KEY = 'targets' TARGETS_PRED_KEY = 'targets_pred' def __init__(self, fit_intercept, nb_input_features=9, nb_output_features=1): self.fit_intercept = fit_intercept self.nb_input_features = nb_input_features self.nb_output_features = nb_output_features def get_args(self): args = { 'nb_input_features': self.nb_input_features, 'nb_output_features': self.nb_output_features, 'fit_intercept': self.fit_intercept } return args def create_model(self): self.model = LinearRegression(fit_intercept=self.fit_intercept) def train(self, dataset): features = np.stack([sample for sample in iter( dataset[LinearRegressionModel.FEATURES_KEY])], axis=0) targets = np.stack([sample for sample in iter( dataset[LinearRegressionModel.TARGETS_KEY])], axis=0) self.model.fit(features, targets) def predict(self, dataset): features = np.stack([sample for sample in iter( dataset[LinearRegressionModel.FEATURES_KEY])], axis=0) targets_pred = self.model.predict(features) return targets_pred def save(self, path): if os.path.exists(path): shutil.rmtree(path) # save the sklearn model with mlflow mlflow.sklearn.save_model(self.model, path) # save args self._save_args(path) def _save_args(self, path): args_filename = os.path.join(path, LinearRegressionModel._ARGS_FILENAME) with open(args_filename, 'w') as f: args = self.get_args() json.dump(args, f) def train(train_df, output_model_path): print(f"Start to train LinearRegressionModel.") # Initialize input dataset dataset = train_df.to_numpy() datasets = {} datasets['targets'] = dataset[:, -1] datasets['features'] = dataset[:, :9] # Initialize model class obj model_class = LinearRegressionModel(fit_intercept=10) with mlflow.start_run(nested=True) as run: model_class.create_model() model_class.train(datasets) model_class.save(output_model_path) print(model_class.predict(datasets)) train(train_df, './artifacts/output')
Lagra MLFLOW-artefakter för modell i ADLS eller registrera i AML.
# Store model MLFLOW artifacts in ADLS STORAGE_PATH = 'abfs[s]://<container>/<path-to-store-folder>' protocol, _ = split_protocol(STORAGE_PATH) print (protocol) storage_options = { 'account_name': adls_account_name, 'account_key': adls_account_key } fs = fsspec.filesystem(protocol, **storage_options) fs.put( './artifacts/output', STORAGE_PATH, recursive=True, overwrite=True)
# Register model MLFLOW artifacts in AML from azureml.core import Workspace, Model from azureml.core.authentication import ServicePrincipalAuthentication AZURE_TENANT_ID = "xyz" AZURE_CLIENT_ID = "xyz" AZURE_CLIENT_SECRET = "xyz" AML_SUBSCRIPTION_ID = "xyz" AML_RESOURCE_GROUP = "xyz" AML_WORKSPACE_NAME = "xyz" svc_pr = ServicePrincipalAuthentication( tenant_id=AZURE_TENANT_ID, service_principal_id=AZURE_CLIENT_ID, service_principal_password=AZURE_CLIENT_SECRET ) ws = Workspace( workspace_name = AML_WORKSPACE_NAME, subscription_id = AML_SUBSCRIPTION_ID, resource_group = AML_RESOURCE_GROUP, auth=svc_pr ) model = Model.register( model_path="./artifacts/output", model_name="xyz", workspace=ws, )
Ange obligatoriska parametrar med hjälp av variabler.
# If using ADLS uploaded model import pandas as pd from pyspark.sql import SparkSession from pyspark.sql.functions import col, pandas_udf,udf,lit import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_logger DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv" ADLS_MODEL_URI_SKLEARN = "abfss://xyz@xyz.dfs.core.windows.net/mlflow/sklearn/ e2e_linear_regression/" RETURN_TYPES = "INT" RUNTIME = "mlflow"
# If using AML registered model from pyspark.sql.functions import col, pandas_udf,udf,lit from azureml.core import Workspace from azureml.core.authentication import ServicePrincipalAuthentication import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_logger DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv" AML_MODEL_URI_SKLEARN = "aml://xyz" RETURN_TYPES = "INT" RUNTIME = "mlflow"
Aktivera SynapseML PREDICT-funktioner i Spark-sessionen.
spark.conf.set("spark.synapse.ml.predict.enabled","true")
Bind modell i Spark-session.
# If using ADLS uploaded model model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="sklearn_linear_regression", model_uri=ADLS_MODEL_URI_SKLEARN, ).register()
# If using AML registered model model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="sklearn_linear_regression", model_uri=AML_MODEL_URI_SKLEARN, aml_workspace=ws ).register()
Läs in testdata från ADLS.
# Load data from ADLS df = spark.read \ .format("csv") \ .option("header", "true") \ .csv(DATA_FILE, inferSchema=True) df = df.select(df.columns[:9]) df.createOrReplaceTempView('data') df.show(10)
Anropa PREDICT för att generera poängen.
# Call PREDICT predictions = spark.sql( """ SELECT PREDICT('sklearn_linear_regression', *) AS predict FROM data """ ).show()
Nästa steg
Feedback
https://aka.ms/ContentUserFeedback.
Kommer snart: Under hela 2024 kommer vi att fasa ut GitHub-problem som feedbackmekanism för innehåll och ersätta det med ett nytt feedbacksystem. Mer information finns i:Skicka och visa feedback för