Kurz: Skóre modelů strojového učení pomocí funkce PREDICT v bezserverových fondech Apache Sparku
Naučte se používat funkce PREDICT v bezserverových fondech Apache Sparku v Azure Synapse Analytics k predikci skóre. Vytrénovaný model zaregistrovaný ve službě Azure Machine Learning (AML) nebo ve výchozím Azure Data Lake Storage (ADLS) můžete použít v pracovním prostoru Synapse.
Funkce PREDICT v poznámkovém bloku Synapse PySpark poskytuje možnost ohodnotit modely strojového učení pomocí jazyka SQL, uživatelem definovaných funkcí (UDF) nebo Transformátorů. S funkcí PREDICT můžete přenést stávající modely strojového učení natrénované mimo Synapse a zaregistrované v Azure Data Lake Storage Gen2 nebo Azure Machine Learning a ohodnotit historická data v rámci zabezpečených hranic Azure Synapse Analytics. Funkce PREDICT přebírá model a data jako vstupy. Tato funkce eliminuje krok přesunu cenných dat mimo Synapse pro vyhodnocování. Cílem je umožnit uživatelům modelů snadno odvodit modely strojového učení v Synapse a bezproblémově spolupracovat s producenty modelů, kteří pracují se správnou architekturou pro jejich úlohu.
V tomto kurzu se naučíte:
- Predikce skóre pro data v bezserverovém fondu Apache Sparku pomocí modelů strojového učení, které jsou natrénované mimo Synapse a zaregistrované ve službě Azure Machine Learning nebo Azure Data Lake Storage Gen2.
Pokud nemáte předplatné Azure, vytvořte si před zahájením bezplatného účtu.
Požadavky
- Azure Synapse pracovní prostor Analytics s Azure Data Lake Storage Gen2 účtem úložiště nakonfigurovaným jako výchozí úložiště. Musíte být přispěvatelem dat objektů blob služby Storage Data Lake Storage Gen2 systému souborů, se kterým pracujete.
- Bezserverový fond Apache Sparku v pracovním prostoru Azure Synapse Analytics. Podrobnosti najdete v tématu Vytvoření fondu Sparku v Azure Synapse.
- Pracovní prostor Azure Machine Learning je potřeba, pokud chcete trénovat nebo registrovat model ve službě Azure Machine Learning. Podrobnosti najdete v tématu Správa pracovních prostorů Služby Azure Machine Learning na portálu nebo pomocí sady Python SDK.
- Pokud je váš model zaregistrovaný ve službě Azure Machine Learning, potřebujete propojenou službu. V Azure Synapse Analytics definuje propojená služba informace o připojení ke službě. V tomto kurzu přidáte propojenou službu Azure Synapse Analytics a Azure Machine Learning. Další informace najdete v tématu Vytvoření nové propojené služby Azure Machine Learning v Synapse.
- Funkce PREDICT vyžaduje, abyste už měli natrénovaný model, který je zaregistrovaný ve službě Azure Machine Learning nebo nahraný v Azure Data Lake Storage Gen2.
Poznámka
- Funkce PREDICT je podporovaná v bezserverovém fondu Apache Sparku Spark3 v Azure Synapse Analytics. Python 3.8 je doporučená verze pro vytváření a trénování modelů.
- Predict podporuje většinu balíčků modelů strojového učení ve formátu MLflow : Tato verze Preview podporuje TensorFlow, ONNX, PyTorch, SkLearn a pyfunc .
- FUNKCE PREDICT podporuje zdroj modelů AML a ADLS . Tady účet ADLS odkazuje na výchozí účet ADLS pracovního prostoru Synapse.
Přihlášení k webu Azure Portal
Přihlaste se k webu Azure Portal.
Použití funkce PREDICT pro modely s balíčky MLFLOW
Před použitím funkce PREDICT se ujistěte, že jsou splněné všechny požadavky.
Import knihoven: Pokud chcete v relaci Sparku použít funkci PREDICT, naimportujte následující knihovny.
#Import libraries from pyspark.sql.functions import col, pandas_udf,udf,lit from azureml.core import Workspace from azureml.core.authentication import ServicePrincipalAuthentication import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
Nastavení parametrů pomocí proměnných: Cestu k datům Synapse ADLS a identifikátor URI modelu je potřeba nastavit pomocí vstupních proměnných. Musíte také definovat modul runtime, což je "mlflow" a datový typ návratu výstupu modelu. Upozorňujeme, že funkce PREDICT podporuje také všechny datové typy podporované v PySparku.
Poznámka
Před spuštěním tohoto skriptu ho aktualizujte pomocí identifikátoru URI pro datový soubor ADLS Gen2 spolu s výstupním datovým typem modelu a identifikátorem URI ADLS/AML pro soubor modelu.
#Set input data path DATA_FILE = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<file path>" #Set model URI #Set AML URI, if trained model is registered in AML AML_MODEL_URI = "<aml model uri>" #In URI ":x" signifies model version in AML. You can choose which model version you want to run. If ":x" is not provided then by default latest version will be picked. #Set ADLS URI, if trained model is uploaded in ADLS ADLS_MODEL_URI = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<model mlflow folder path>" #Define model return type RETURN_TYPES = "<data_type>" # for ex: int, float etc. PySpark data types are supported #Define model runtime. This supports only mlflow RUNTIME = "mlflow"
Způsoby ověřování pracovního prostoru AML: Pokud je model uložený ve výchozím účtu ADLS pracovního prostoru Synapse, nepotřebujete žádné další nastavení ověřování. Pokud je model zaregistrovaný ve službě Azure Machine Learning, můžete zvolit některý z následujících dvou podporovaných způsobů ověřování.
Poznámka
Před spuštěním tohoto skriptu aktualizujte podrobnosti o tenantovi, klientovi, předplatném, skupině prostředků, pracovním prostoru AML a propojené službě.
Prostřednictvím instančního objektu: K ověření v pracovním prostoru AML můžete přímo použít ID klienta instančního objektu a tajný klíč. Instanční objekt musí mít přístup Přispěvatel k pracovnímu prostoru AML.
#AML workspace authentication using service principal AZURE_TENANT_ID = "<tenant_id>" AZURE_CLIENT_ID = "<client_id>" AZURE_CLIENT_SECRET = "<client_secret>" AML_SUBSCRIPTION_ID = "<subscription_id>" AML_RESOURCE_GROUP = "<resource_group_name>" AML_WORKSPACE_NAME = "<aml_workspace_name>" svc_pr = ServicePrincipalAuthentication( tenant_id=AZURE_TENANT_ID, service_principal_id=AZURE_CLIENT_ID, service_principal_password=AZURE_CLIENT_SECRET ) ws = Workspace( workspace_name = AML_WORKSPACE_NAME, subscription_id = AML_SUBSCRIPTION_ID, resource_group = AML_RESOURCE_GROUP, auth=svc_pr )
Prostřednictvím propojené služby: K ověření v pracovním prostoru AML můžete použít propojenou službu. Propojená služba může k ověřování používat instanční objekt nebo identitu spravované služby (MSI) pracovního prostoru Synapse. Instanční objekt nebo identita spravované služby (MSI) musí mít přístup přispěvatele k pracovnímu prostoru AML.
#AML workspace authentication using linked service from notebookutils.mssparkutils import azureML ws = azureML.getWorkspace("<linked_service_name>") # "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked service supports MSI and service principal both
Povolení funkce PREDICT v relaci Sparku: Pokud chcete povolit knihovnu, nastavte konfiguraci
spark.synapse.ml.predict.enabled
true
Sparku na hodnotu .#Enable SynapseML predict spark.conf.set("spark.synapse.ml.predict.enabled","true")
Vytvoření vazby modelu v relaci Sparku: Vytvořte vazbu modelu s požadovanými vstupy, aby se na model mohl odkazovat v relaci Sparku. Definujte také alias, abyste mohli použít stejný alias ve volání PREDICT.
Poznámka
Před spuštěním tohoto skriptu aktualizujte alias modelu a identifikátor URI modelu.
#Bind model within Spark session model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="<random_alias_name>", #This alias will be used in PREDICT call to refer this model model_uri=ADLS_MODEL_URI, #In case of AML, it will be AML_MODEL_URI aml_workspace=ws #This is only for AML. In case of ADLS, this parameter can be removed ).register()
Čtení dat z ADLS: Čtení dat z ADLS Vytvořte datový rámec Sparku a zobrazení nad datovým rámcem.
Poznámka
Před spuštěním tohoto skriptu aktualizujte název zobrazení.
#Read data from ADLS df = spark.read \ .format("csv") \ .option("header", "true") \ .csv(DATA_FILE, inferSchema=True) df.createOrReplaceTempView('<view_name>')
Generování skóre pomocí funkce PREDICT: Funkci PREDICT můžete volat třemi způsoby: pomocí rozhraní SPARK SQL API, pomocí funkce UDF (User define Function) a pomocí rozhraní Transformer API. Tady jsou příklady.
Poznámka
Před spuštěním tohoto skriptu aktualizujte název aliasu modelu, název zobrazení a název vstupního sloupce modelu oddělený čárkami. Vstupní sloupce modelu oddělené čárkami jsou stejné jako sloupce použité při trénování modelu.
#Call PREDICT using Spark SQL API predictions = spark.sql( """ SELECT PREDICT('<random_alias_name>', <comma_separated_model_input_column_name>) AS predict FROM <view_name> """ ).show()
#Call PREDICT using user defined function (UDF) df = df[<comma_separated_model_input_column_name>] # for ex. df["empid","empname"] df.withColumn("PREDICT",model.udf(lit("<random_alias_name>"),*df.columns)).show()
#Call PREDICT using Transformer API columns = [<comma_separated_model_input_column_name>] # for ex. df["empid","empname"] tranformer = model.create_transformer().setInputCols(columns).setOutputCol("PREDICT") tranformer.transform(df).show()
Příklad Sklearnu s využitím funkce PREDICT
Importujte knihovny a přečtěte si trénovací datovou sadu z ADLS.
# Import libraries and read training dataset from ADLS import fsspec import pandas from fsspec.core import split_protocol adls_account_name = 'xyz' #Provide exact ADLS account name adls_account_key = 'xyz' #Provide exact ADLS account key fsspec_handle = fsspec.open('abfs[s]://<container>/<path-to-file>', account_name=adls_account_name, account_key=adls_account_key) with fsspec_handle.open() as f: train_df = pandas.read_csv(f)
Trénování modelu a generování artefaktů mlflow
# Train model and generate mlflow artifacts import os import shutil import mlflow import json from mlflow.utils import model_utils import numpy as np import pandas as pd from sklearn.linear_model import LinearRegression class LinearRegressionModel(): _ARGS_FILENAME = 'args.json' FEATURES_KEY = 'features' TARGETS_KEY = 'targets' TARGETS_PRED_KEY = 'targets_pred' def __init__(self, fit_intercept, nb_input_features=9, nb_output_features=1): self.fit_intercept = fit_intercept self.nb_input_features = nb_input_features self.nb_output_features = nb_output_features def get_args(self): args = { 'nb_input_features': self.nb_input_features, 'nb_output_features': self.nb_output_features, 'fit_intercept': self.fit_intercept } return args def create_model(self): self.model = LinearRegression(fit_intercept=self.fit_intercept) def train(self, dataset): features = np.stack([sample for sample in iter( dataset[LinearRegressionModel.FEATURES_KEY])], axis=0) targets = np.stack([sample for sample in iter( dataset[LinearRegressionModel.TARGETS_KEY])], axis=0) self.model.fit(features, targets) def predict(self, dataset): features = np.stack([sample for sample in iter( dataset[LinearRegressionModel.FEATURES_KEY])], axis=0) targets_pred = self.model.predict(features) return targets_pred def save(self, path): if os.path.exists(path): shutil.rmtree(path) # save the sklearn model with mlflow mlflow.sklearn.save_model(self.model, path) # save args self._save_args(path) def _save_args(self, path): args_filename = os.path.join(path, LinearRegressionModel._ARGS_FILENAME) with open(args_filename, 'w') as f: args = self.get_args() json.dump(args, f) def train(train_df, output_model_path): print(f"Start to train LinearRegressionModel.") # Initialize input dataset dataset = train_df.to_numpy() datasets = {} datasets['targets'] = dataset[:, -1] datasets['features'] = dataset[:, :9] # Initialize model class obj model_class = LinearRegressionModel(fit_intercept=10) with mlflow.start_run(nested=True) as run: model_class.create_model() model_class.train(datasets) model_class.save(output_model_path) print(model_class.predict(datasets)) train(train_df, './artifacts/output')
Uložte artefakty MLFLOW modelu v ADLS nebo zaregistrujte v AML.
# Store model MLFLOW artifacts in ADLS STORAGE_PATH = 'abfs[s]://<container>/<path-to-store-folder>' protocol, _ = split_protocol(STORAGE_PATH) print (protocol) storage_options = { 'account_name': adls_account_name, 'account_key': adls_account_key } fs = fsspec.filesystem(protocol, **storage_options) fs.put( './artifacts/output', STORAGE_PATH, recursive=True, overwrite=True)
# Register model MLFLOW artifacts in AML from azureml.core import Workspace, Model from azureml.core.authentication import ServicePrincipalAuthentication AZURE_TENANT_ID = "xyz" AZURE_CLIENT_ID = "xyz" AZURE_CLIENT_SECRET = "xyz" AML_SUBSCRIPTION_ID = "xyz" AML_RESOURCE_GROUP = "xyz" AML_WORKSPACE_NAME = "xyz" svc_pr = ServicePrincipalAuthentication( tenant_id=AZURE_TENANT_ID, service_principal_id=AZURE_CLIENT_ID, service_principal_password=AZURE_CLIENT_SECRET ) ws = Workspace( workspace_name = AML_WORKSPACE_NAME, subscription_id = AML_SUBSCRIPTION_ID, resource_group = AML_RESOURCE_GROUP, auth=svc_pr ) model = Model.register( model_path="./artifacts/output", model_name="xyz", workspace=ws, )
Nastavte požadované parametry pomocí proměnných.
# If using ADLS uploaded model import pandas as pd from pyspark.sql import SparkSession from pyspark.sql.functions import col, pandas_udf,udf,lit import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_logger DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv" ADLS_MODEL_URI_SKLEARN = "abfss://xyz@xyz.dfs.core.windows.net/mlflow/sklearn/ e2e_linear_regression/" RETURN_TYPES = "INT" RUNTIME = "mlflow"
# If using AML registered model from pyspark.sql.functions import col, pandas_udf,udf,lit from azureml.core import Workspace from azureml.core.authentication import ServicePrincipalAuthentication import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_logger DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv" AML_MODEL_URI_SKLEARN = "aml://xyz" RETURN_TYPES = "INT" RUNTIME = "mlflow"
Povolte funkci SynapseML PREDICT v relaci Sparku.
spark.conf.set("spark.synapse.ml.predict.enabled","true")
Vytvoření vazby modelu v relaci Sparku
# If using ADLS uploaded model model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="sklearn_linear_regression", model_uri=ADLS_MODEL_URI_SKLEARN, ).register()
# If using AML registered model model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="sklearn_linear_regression", model_uri=AML_MODEL_URI_SKLEARN, aml_workspace=ws ).register()
Zátěžová testovací data z ADLS.
# Load data from ADLS df = spark.read \ .format("csv") \ .option("header", "true") \ .csv(DATA_FILE, inferSchema=True) df = df.select(df.columns[:9]) df.createOrReplaceTempView('data') df.show(10)
Zavolejte PREDICT a vygenerujte skóre.
# Call PREDICT predictions = spark.sql( """ SELECT PREDICT('sklearn_linear_regression', *) AS predict FROM data """ ).show()
Další kroky
Váš názor
https://aka.ms/ContentUserFeedback.
Připravujeme: V průběhu roku 2024 budeme postupně vyřazovat problémy z GitHub coby mechanismus zpětné vazby pro obsah a nahrazovat ho novým systémem zpětné vazby. Další informace naleznete v tématu:Odeslat a zobrazit názory pro