Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Узнайте, как использовать функции PREDICT в бессерверных пулах Apache Spark в Azure Synapse Analytics для прогнозирования оценки. Вы можете использовать обученную модель, зарегистрированную в Машинном обучении Azure (AML) или в azure Data Lake Storage (ADLS) по умолчанию в рабочей области Synapse.
PREDICT в записной книжке Synapse PySpark предоставляет возможность оценки моделей машинного обучения с помощью языка SQL, определяемых пользователем функций (UDF) или Преобразователей. С помощью PREDICT можно перенести существующие модели машинного обучения, обученные за пределами Synapse и зарегистрированные в Azure Data Lake Storage 2-го поколения или Машинном обучении Azure, чтобы оценить исторические данные в пределах безопасных границ Azure Synapse Analytics. Функция PREDICT принимает модель и данные в качестве входных данных. Эта функция устраняет шаг перемещения ценных данных за пределы Synapse для оценки. Цель заключается в том, чтобы предоставить потребителям модели возможность легко использовать модели машинного обучения в Synapse, а также эффективно сотрудничать с производителями моделей, работающими с подходящим фреймворком для своей задачи.
В этом руководстве описано, как:
- Прогнозируйте оценки данных в бессерверном пуле Apache Spark с помощью моделей машинного обучения, которые обучены за пределами Synapse и зарегистрированы в Машинном обучении Azure или Azure Data Lake Storage 2-го поколения.
Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.
Prerequisites
- Рабочая область Azure Synapse Analytics с учетной записью хранения Azure Data Lake Storage 2-го поколения, настроенной в качестве хранилища по умолчанию. Вам необходимо быть участником данных хранилища блобов в файловой системе Data Lake Storage второго поколения, с которой вы работаете.
- Бессерверный пул Apache Spark в рабочей области Azure Synapse Analytics. Дополнительные сведения см. в статье Создание пула Spark в Azure Synapse.
- Рабочая область машинного обучения Azure необходима, если вы хотите обучить или зарегистрировать модель в Машинном обучении Azure. Дополнительные сведения см. в статье "Управление рабочими областями машинного обучения Azure" на портале или с помощью пакета SDK для Python.
- Если модель зарегистрирована в Машинном обучении Azure, вам нужна связанная служба. В Azure Synapse Analytics подключенная служба определяет информацию о вашем подключении к этой службе. В этом руководстве вы добавите связанную службу Azure Synapse Analytics и машинного обучения Azure. Дополнительные сведения см. в статье "Создание связанной службы машинного обучения Azure" в Synapse.
- Для функции PREDICT требуется, чтобы у вас уже была обученная модель, которая зарегистрирована в Машинном обучении Azure или отправлена в Azure Data Lake Storage 2-го поколения.
Note
- Функция PREDICT поддерживается в бессерверном пуле Spark3 Apache Spark в Azure Synapse Analytics. Рекомендуется использовать версию Python 3.8 для создания и обучения моделей.
- PREDICT поддерживает большинство пакетов моделей машинного обучения в формате MLflow : TensorFlow, ONNX, PyTorch, SkLearn и pyfunc поддерживаются в этой предварительной версии.
- PREDICT поддерживает модели AML и ADLS. Здесь учетная запись ADLS относится к учетной записи ADLS по умолчанию в рабочей области Synapse.
Войдите на портал Azure
Войдите на портал Azure.
Используйте PREDICT для упакованных моделей в MLFLOW
Убедитесь, что все необходимые условия выполнены, прежде чем следовать этим шагам для использования PREDICT.
Импорт библиотек: Импортируйте следующие библиотеки для использования PREDICT в сеансе Spark.
#Import libraries from pyspark.sql.functions import col, pandas_udf,udf,lit from azureml.core import Workspace from azureml.core.authentication import ServicePrincipalAuthentication import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_loggerЗадайте параметры с помощью переменных: Путь к данным Synapse ADLS и URI модели необходимо задать с помощью входных переменных. Кроме того, необходимо определить среду выполнения, которая является "mlflow", и тип возвращаемых выходных данных модели. Обратите внимание, что все типы данных, поддерживаемые в PySpark, также поддерживаются с помощью PREDICT.
Note
Перед выполнением этого скрипта обновите его с помощью URI для файла данных ADLS 2-го поколения вместе с типом данных выходных данных модели и URI ADLS/AML для файла модели.
#Set input data path DATA_FILE = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<file path>" #Set model URI #Set AML URI, if trained model is registered in AML AML_MODEL_URI = "<aml model uri>" #In URI ":x" signifies model version in AML. You can choose which model version you want to run. If ":x" is not provided then by default latest version will be picked. #Set ADLS URI, if trained model is uploaded in ADLS ADLS_MODEL_URI = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<model mlflow folder path>" #Define model return type RETURN_TYPES = "<data_type>" # for ex: int, float etc. PySpark data types are supported #Define model runtime. This supports only mlflow RUNTIME = "mlflow"Способы проверки подлинности рабочей области AML: Если модель хранится в учетной записи ADLS по умолчанию рабочей области Synapse, вам не требуется дополнительная настройка проверки подлинности. Если модель зарегистрирована в Машинном обучении Azure, можно выбрать один из следующих двух поддерживаемых способов проверки подлинности.
Note
Обновите арендатора, клиента, подписку, группу ресурсов, рабочее пространство AML и сведения о связанной службе в этом скрипте перед запуском.
(Рекомендуется) через связанную службу: Связанную службу можно использовать для проверки подлинности в рабочей области AML. Связанная служба может использовать "служебный принципал" или "Управляемая служебная идентификация (MSI)" рабочей области Synapse для проверки подлинности. Сервисный принципал или "Управляемое удостоверение службы (MSI)" должны иметь доступ с ролью "Участник" к рабочей области AML.
#AML workspace authentication using linked service from notebookutils.mssparkutils import azureML ws = azureML.getWorkspace("<linked_service_name>") # "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked service supports MSI and service principal bothС помощью служебного принципала: Хотя не рекомендуется, можно использовать идентификатор клиента служебного принципала и секрет непосредственно для проверки подлинности в рабочей области Azure Machine Learning. При предоставлении пароля служебного принципала напрямую возникает некоторый риск безопасности, поэтому мы рекомендуем использовать связанную службу при возможности. Субъект-служба должен иметь доступ "Участник" к рабочей области AML.
#AML workspace authentication using service principal AZURE_TENANT_ID = "<tenant_id>" AZURE_CLIENT_ID = "<client_id>" AZURE_CLIENT_SECRET = "<client_secret>" AML_SUBSCRIPTION_ID = "<subscription_id>" AML_RESOURCE_GROUP = "<resource_group_name>" AML_WORKSPACE_NAME = "<aml_workspace_name>" svc_pr = ServicePrincipalAuthentication( tenant_id=AZURE_TENANT_ID, service_principal_id=AZURE_CLIENT_ID, service_principal_password=AZURE_CLIENT_SECRET ) ws = Workspace( workspace_name = AML_WORKSPACE_NAME, subscription_id = AML_SUBSCRIPTION_ID, resource_group = AML_RESOURCE_GROUP, auth=svc_pr )
Включите функцию PREDICT в сеансе Spark: Установите конфигурацию Spark, чтобы включить библиотеку.
#Enable SynapseML predict spark.conf.set("spark.synapse.ml.predict.enabled","true")Привязка модели в сеансе Spark: Привязывайте модель с необходимыми входными данными, чтобы можно было ссылаться на модель в сеансе Spark. Также определите псевдоним, чтобы можно было использовать тот же псевдоним в вызове PREDICT.
Note
Обновите псевдоним модели и URI модели в этом скрипте перед запуском.
#Bind model within Spark session model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="<random_alias_name>", #This alias will be used in PREDICT call to refer this model model_uri=ADLS_MODEL_URI, #In case of AML, it will be AML_MODEL_URI aml_workspace=ws #This is only for AML. In case of ADLS, this parameter can be removed ).register()Чтение данных из ADLS: Чтение данных из ADLS. Создайте кадр данных Spark и представление на вершине кадра данных.
Note
Обновите название представления в этом скрипте перед его запуском.
#Read data from ADLS df = spark.read \ .format("csv") \ .option("header", "true") \ .csv(DATA_FILE, inferSchema=True) df.createOrReplaceTempView('<view_name>')Создание оценки с помощью PREDICT: Вы можете вызывать PREDICT тремя способами, используя API SQL Spark, используя функцию определяемой пользователем функции (UDF) и используя API преобразователя. Ниже приведены примеры.
Note
Обновите имя псевдонима модели, имя представления и имя входного столбца модели, разделенного запятыми, перед запуском этого скрипта. Входные столбцы модели, разделенные запятыми, совпадают с теми, которые используются при обучении модели.
#Call PREDICT using Spark SQL API predictions = spark.sql( """ SELECT PREDICT('<random_alias_name>', <comma_separated_model_input_column_name>) AS predict FROM <view_name> """ ).show()#Call PREDICT using user defined function (UDF) df = df[<comma_separated_model_input_column_name>] # for ex. df["empid","empname"] df.withColumn("PREDICT",model.udf(lit("<random_alias_name>"),*df.columns)).show()#Call PREDICT using Transformer API columns = [<comma_separated_model_input_column_name>] # for ex. df["empid","empname"] transformer = model.create_transformer().setInputCols(columns).setOutputCol("PREDICT") transformer.transform(df).show()
Пример Sklearn с помощью PREDICT
Импортируйте библиотеки и загрузите обучающий набор данных из ADLS.
# Import libraries and read training dataset from ADLS import fsspec import pandas from fsspec.core import split_protocol adls_account_name = 'xyz' #Provide exact ADLS account name adls_account_key = 'xyz' #Provide exact ADLS account key fsspec_handle = fsspec.open('abfs[s]://<container>/<path-to-file>', account_name=adls_account_name, account_key=adls_account_key) with fsspec_handle.open() as f: train_df = pandas.read_csv(f)Обучение модели и создание артефактов mlflow.
# Train model and generate mlflow artifacts import os import shutil import mlflow import json from mlflow.utils import model_utils import numpy as np import pandas as pd from sklearn.linear_model import LinearRegression class LinearRegressionModel(): _ARGS_FILENAME = 'args.json' FEATURES_KEY = 'features' TARGETS_KEY = 'targets' TARGETS_PRED_KEY = 'targets_pred' def __init__(self, fit_intercept, nb_input_features=9, nb_output_features=1): self.fit_intercept = fit_intercept self.nb_input_features = nb_input_features self.nb_output_features = nb_output_features def get_args(self): args = { 'nb_input_features': self.nb_input_features, 'nb_output_features': self.nb_output_features, 'fit_intercept': self.fit_intercept } return args def create_model(self): self.model = LinearRegression(fit_intercept=self.fit_intercept) def train(self, dataset): features = np.stack([sample for sample in iter( dataset[LinearRegressionModel.FEATURES_KEY])], axis=0) targets = np.stack([sample for sample in iter( dataset[LinearRegressionModel.TARGETS_KEY])], axis=0) self.model.fit(features, targets) def predict(self, dataset): features = np.stack([sample for sample in iter( dataset[LinearRegressionModel.FEATURES_KEY])], axis=0) targets_pred = self.model.predict(features) return targets_pred def save(self, path): if os.path.exists(path): shutil.rmtree(path) # save the sklearn model with mlflow mlflow.sklearn.save_model(self.model, path) # save args self._save_args(path) def _save_args(self, path): args_filename = os.path.join(path, LinearRegressionModel._ARGS_FILENAME) with open(args_filename, 'w') as f: args = self.get_args() json.dump(args, f) def train(train_df, output_model_path): print(f"Start to train LinearRegressionModel.") # Initialize input dataset dataset = train_df.to_numpy() datasets = {} datasets['targets'] = dataset[:, -1] datasets['features'] = dataset[:, :9] # Initialize model class obj model_class = LinearRegressionModel(fit_intercept=10) with mlflow.start_run(nested=True) as run: model_class.create_model() model_class.train(datasets) model_class.save(output_model_path) print(model_class.predict(datasets)) train(train_df, './artifacts/output')Храните артефакты модели MLflow в ADLS или зарегистрируйте их в AML.
# Store model MLFLOW artifacts in ADLS STORAGE_PATH = 'abfs[s]://<container>/<path-to-store-folder>' protocol, _ = split_protocol(STORAGE_PATH) print (protocol) storage_options = { 'account_name': adls_account_name, 'account_key': adls_account_key } fs = fsspec.filesystem(protocol, **storage_options) fs.put( './artifacts/output', STORAGE_PATH, recursive=True, overwrite=True)# Register model MLFLOW artifacts in AML from azureml.core import Workspace, Model from azureml.core.authentication import ServicePrincipalAuthentication from notebookutils.mssparkutils import azureML AZURE_TENANT_ID = "xyz" AZURE_CLIENT_ID = "xyz" AZURE_CLIENT_SECRET = "xyz" AML_SUBSCRIPTION_ID = "xyz" AML_RESOURCE_GROUP = "xyz" AML_WORKSPACE_NAME = "xyz" #AML workspace authentication using linked service ws = azureML.getWorkspace("<linked_service_name>") # "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked service supports MSI and service principal both model = Model.register( model_path="./artifacts/output", model_name="xyz", workspace=ws, )Задайте необходимые параметры с помощью переменных.
# If using ADLS uploaded model import pandas as pd from pyspark.sql import SparkSession from pyspark.sql.functions import col, pandas_udf,udf,lit import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_logger DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv" ADLS_MODEL_URI_SKLEARN = "abfss://xyz@xyz.dfs.core.windows.net/mlflow/sklearn/ e2e_linear_regression/" RETURN_TYPES = "INT" RUNTIME = "mlflow"# If using AML registered model from pyspark.sql.functions import col, pandas_udf,udf,lit from azureml.core import Workspace from azureml.core.authentication import ServicePrincipalAuthentication import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_logger DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv" AML_MODEL_URI_SKLEARN = "aml://xyz" RETURN_TYPES = "INT" RUNTIME = "mlflow"Включите функции SynapseML PREDICT в сеансе Spark.
spark.conf.set("spark.synapse.ml.predict.enabled","true")Привязка модели в сеансе Spark.
# If using ADLS uploaded model model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="sklearn_linear_regression", model_uri=ADLS_MODEL_URI_SKLEARN, ).register()# If using AML registered model model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="sklearn_linear_regression", model_uri=AML_MODEL_URI_SKLEARN, aml_workspace=ws ).register()Загрузите данные для нагрузочного тестирования из ADLS.
# Load data from ADLS df = spark.read \ .format("csv") \ .option("header", "true") \ .csv(DATA_FILE, inferSchema=True) df = df.select(df.columns[:9]) df.createOrReplaceTempView('data') df.show(10)Вызовите PREDICT для генерации результата.
# Call PREDICT predictions = spark.sql( """ SELECT PREDICT('sklearn_linear_regression', *) AS predict FROM data """ ).show()