Руководство. Оценка моделей машинного обучения с помощью PREDICT в бессерверных пулах Apache Spark

Узнайте, как использовать функции PREDICT в бессерверных пулах Apache Spark в Azure Synapse Analytics для прогнозирования оценки. Вы можете использовать обученную модель, зарегистрированную в Машинном обучении Azure (AML) или в azure Data Lake Storage (ADLS) по умолчанию в рабочей области Synapse.

PREDICT в записной книжке Synapse PySpark предоставляет возможность оценки моделей машинного обучения с помощью языка SQL, определяемых пользователем функций (UDF) или Преобразователей. С помощью PREDICT можно перенести существующие модели машинного обучения, обученные за пределами Synapse и зарегистрированные в Azure Data Lake Storage 2-го поколения или Машинном обучении Azure, чтобы оценить исторические данные в пределах безопасных границ Azure Synapse Analytics. Функция PREDICT принимает модель и данные в качестве входных данных. Эта функция устраняет шаг перемещения ценных данных за пределы Synapse для оценки. Цель заключается в том, чтобы предоставить потребителям модели возможность легко использовать модели машинного обучения в Synapse, а также эффективно сотрудничать с производителями моделей, работающими с подходящим фреймворком для своей задачи.

В этом руководстве описано, как:

  • Прогнозируйте оценки данных в бессерверном пуле Apache Spark с помощью моделей машинного обучения, которые обучены за пределами Synapse и зарегистрированы в Машинном обучении Azure или Azure Data Lake Storage 2-го поколения.

Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.

Prerequisites

  • Рабочая область Azure Synapse Analytics с учетной записью хранения Azure Data Lake Storage 2-го поколения, настроенной в качестве хранилища по умолчанию. Вам необходимо быть участником данных хранилища блобов в файловой системе Data Lake Storage второго поколения, с которой вы работаете.
  • Бессерверный пул Apache Spark в рабочей области Azure Synapse Analytics. Дополнительные сведения см. в статье Создание пула Spark в Azure Synapse.
  • Рабочая область машинного обучения Azure необходима, если вы хотите обучить или зарегистрировать модель в Машинном обучении Azure. Дополнительные сведения см. в статье "Управление рабочими областями машинного обучения Azure" на портале или с помощью пакета SDK для Python.
  • Если модель зарегистрирована в Машинном обучении Azure, вам нужна связанная служба. В Azure Synapse Analytics подключенная служба определяет информацию о вашем подключении к этой службе. В этом руководстве вы добавите связанную службу Azure Synapse Analytics и машинного обучения Azure. Дополнительные сведения см. в статье "Создание связанной службы машинного обучения Azure" в Synapse.
  • Для функции PREDICT требуется, чтобы у вас уже была обученная модель, которая зарегистрирована в Машинном обучении Azure или отправлена в Azure Data Lake Storage 2-го поколения.

Note

  • Функция PREDICT поддерживается в бессерверном пуле Spark3 Apache Spark в Azure Synapse Analytics. Рекомендуется использовать версию Python 3.8 для создания и обучения моделей.
  • PREDICT поддерживает большинство пакетов моделей машинного обучения в формате MLflow : TensorFlow, ONNX, PyTorch, SkLearn и pyfunc поддерживаются в этой предварительной версии.
  • PREDICT поддерживает модели AML и ADLS. Здесь учетная запись ADLS относится к учетной записи ADLS по умолчанию в рабочей области Synapse.

Войдите на портал Azure

Войдите на портал Azure.

Используйте PREDICT для упакованных моделей в MLFLOW

Убедитесь, что все необходимые условия выполнены, прежде чем следовать этим шагам для использования PREDICT.

  1. Импорт библиотек: Импортируйте следующие библиотеки для использования PREDICT в сеансе Spark.

    #Import libraries
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    from azureml.core import Workspace
    from azureml.core.authentication import ServicePrincipalAuthentication
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
  2. Задайте параметры с помощью переменных: Путь к данным Synapse ADLS и URI модели необходимо задать с помощью входных переменных. Кроме того, необходимо определить среду выполнения, которая является "mlflow", и тип возвращаемых выходных данных модели. Обратите внимание, что все типы данных, поддерживаемые в PySpark, также поддерживаются с помощью PREDICT.

    Note

    Перед выполнением этого скрипта обновите его с помощью URI для файла данных ADLS 2-го поколения вместе с типом данных выходных данных модели и URI ADLS/AML для файла модели.

    #Set input data path
    DATA_FILE = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<file path>"
    
    #Set model URI
        #Set AML URI, if trained model is registered in AML
           AML_MODEL_URI = "<aml model uri>" #In URI ":x" signifies model version in AML. You can   choose which model version you want to run. If ":x" is not provided then by default   latest version will be picked.
    
        #Set ADLS URI, if trained model is uploaded in ADLS
           ADLS_MODEL_URI = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<model   mlflow folder path>"
    
    #Define model return type
    RETURN_TYPES = "<data_type>" # for ex: int, float etc. PySpark data types are supported
    
    #Define model runtime. This supports only mlflow
    RUNTIME = "mlflow"
    
  3. Способы проверки подлинности рабочей области AML: Если модель хранится в учетной записи ADLS по умолчанию рабочей области Synapse, вам не требуется дополнительная настройка проверки подлинности. Если модель зарегистрирована в Машинном обучении Azure, можно выбрать один из следующих двух поддерживаемых способов проверки подлинности.

    Note

    Обновите арендатора, клиента, подписку, группу ресурсов, рабочее пространство AML и сведения о связанной службе в этом скрипте перед запуском.

    • (Рекомендуется) через связанную службу: Связанную службу можно использовать для проверки подлинности в рабочей области AML. Связанная служба может использовать "служебный принципал" или "Управляемая служебная идентификация (MSI)" рабочей области Synapse для проверки подлинности. Сервисный принципал или "Управляемое удостоверение службы (MSI)" должны иметь доступ с ролью "Участник" к рабочей области AML.

      #AML workspace authentication using linked service
      from notebookutils.mssparkutils import azureML
      ws = azureML.getWorkspace("<linked_service_name>") #   "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked   service supports MSI and service principal both
      
    • С помощью служебного принципала: Хотя не рекомендуется, можно использовать идентификатор клиента служебного принципала и секрет непосредственно для проверки подлинности в рабочей области Azure Machine Learning. При предоставлении пароля служебного принципала напрямую возникает некоторый риск безопасности, поэтому мы рекомендуем использовать связанную службу при возможности. Субъект-служба должен иметь доступ "Участник" к рабочей области AML.

      #AML workspace authentication using service principal
      AZURE_TENANT_ID = "<tenant_id>"
      AZURE_CLIENT_ID = "<client_id>"
      AZURE_CLIENT_SECRET = "<client_secret>"
      
      AML_SUBSCRIPTION_ID = "<subscription_id>"
      AML_RESOURCE_GROUP = "<resource_group_name>"
      AML_WORKSPACE_NAME = "<aml_workspace_name>"
      
      svc_pr = ServicePrincipalAuthentication( 
           tenant_id=AZURE_TENANT_ID,
           service_principal_id=AZURE_CLIENT_ID,
           service_principal_password=AZURE_CLIENT_SECRET
      )
      
      ws = Workspace(
           workspace_name = AML_WORKSPACE_NAME,
           subscription_id = AML_SUBSCRIPTION_ID,
           resource_group = AML_RESOURCE_GROUP,
           auth=svc_pr
      )
      
  4. Включите функцию PREDICT в сеансе Spark: Установите конфигурацию Spark, чтобы включить библиотеку.

    #Enable SynapseML predict
    spark.conf.set("spark.synapse.ml.predict.enabled","true")
    
  5. Привязка модели в сеансе Spark: Привязывайте модель с необходимыми входными данными, чтобы можно было ссылаться на модель в сеансе Spark. Также определите псевдоним, чтобы можно было использовать тот же псевдоним в вызове PREDICT.

    Note

    Обновите псевдоним модели и URI модели в этом скрипте перед запуском.

    #Bind model within Spark session
     model = pcontext.bind_model(
      return_types=RETURN_TYPES, 
      runtime=RUNTIME, 
      model_alias="<random_alias_name>", #This alias will be used in PREDICT call to refer  this   model
      model_uri=ADLS_MODEL_URI, #In case of AML, it will be AML_MODEL_URI
      aml_workspace=ws #This is only for AML. In case of ADLS, this parameter can be removed
      ).register()
    
  6. Чтение данных из ADLS: Чтение данных из ADLS. Создайте кадр данных Spark и представление на вершине кадра данных.

    Note

    Обновите название представления в этом скрипте перед его запуском.

    #Read data from ADLS
    df = spark.read \
     .format("csv") \
     .option("header", "true") \
     .csv(DATA_FILE,
         inferSchema=True)
    df.createOrReplaceTempView('<view_name>')
    
  7. Создание оценки с помощью PREDICT: Вы можете вызывать PREDICT тремя способами, используя API SQL Spark, используя функцию определяемой пользователем функции (UDF) и используя API преобразователя. Ниже приведены примеры.

    Note

    Обновите имя псевдонима модели, имя представления и имя входного столбца модели, разделенного запятыми, перед запуском этого скрипта. Входные столбцы модели, разделенные запятыми, совпадают с теми, которые используются при обучении модели.

    #Call PREDICT using Spark SQL API
    
    predictions = spark.sql(
                   """
                       SELECT PREDICT('<random_alias_name>',
                                 <comma_separated_model_input_column_name>) AS predict 
                       FROM <view_name>
                   """
               ).show()
    
    #Call PREDICT using user defined function (UDF)
    
    df = df[<comma_separated_model_input_column_name>] # for ex. df["empid","empname"]
    
    df.withColumn("PREDICT",model.udf(lit("<random_alias_name>"),*df.columns)).show()
    
    #Call PREDICT using Transformer API
    
    columns = [<comma_separated_model_input_column_name>] # for ex. df["empid","empname"]
    
    transformer = model.create_transformer().setInputCols(columns).setOutputCol("PREDICT")
    
    transformer.transform(df).show()
    

Пример Sklearn с помощью PREDICT

  1. Импортируйте библиотеки и загрузите обучающий набор данных из ADLS.

    # Import libraries and read training dataset from ADLS
    
    import fsspec
    import pandas
    from fsspec.core import split_protocol
    
    adls_account_name = 'xyz' #Provide exact ADLS account name
    adls_account_key = 'xyz' #Provide exact ADLS account key
    
    fsspec_handle = fsspec.open('abfs[s]://<container>/<path-to-file>',      account_name=adls_account_name, account_key=adls_account_key)
    
    with fsspec_handle.open() as f:
        train_df = pandas.read_csv(f)
    
  2. Обучение модели и создание артефактов mlflow.

    # Train model and generate mlflow artifacts
    
    import os
    import shutil
    import mlflow
    import json
    from mlflow.utils import model_utils
    import numpy as np
    import pandas as pd
    from sklearn.linear_model import LinearRegression
    
    
    class LinearRegressionModel():
      _ARGS_FILENAME = 'args.json'
      FEATURES_KEY = 'features'
      TARGETS_KEY = 'targets'
      TARGETS_PRED_KEY = 'targets_pred'
    
      def __init__(self, fit_intercept, nb_input_features=9, nb_output_features=1):
        self.fit_intercept = fit_intercept
        self.nb_input_features = nb_input_features
        self.nb_output_features = nb_output_features
    
      def get_args(self):
        args = {
            'nb_input_features': self.nb_input_features,
            'nb_output_features': self.nb_output_features,
            'fit_intercept': self.fit_intercept
        }
        return args
    
      def create_model(self):
        self.model = LinearRegression(fit_intercept=self.fit_intercept)
    
      def train(self, dataset):
    
        features = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.FEATURES_KEY])], axis=0)
    
        targets = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.TARGETS_KEY])], axis=0)
    
    
        self.model.fit(features, targets)
    
      def predict(self, dataset):
        features = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.FEATURES_KEY])], axis=0)
        targets_pred = self.model.predict(features)
        return targets_pred
    
      def save(self, path):
        if os.path.exists(path):
          shutil.rmtree(path)
    
        # save the sklearn model with mlflow
        mlflow.sklearn.save_model(self.model, path)
    
        # save args
        self._save_args(path)
    
      def _save_args(self, path):
        args_filename = os.path.join(path, LinearRegressionModel._ARGS_FILENAME)
        with open(args_filename, 'w') as f:
          args = self.get_args()
          json.dump(args, f)
    
    
    def train(train_df, output_model_path):
      print(f"Start to train LinearRegressionModel.")
    
      # Initialize input dataset
      dataset = train_df.to_numpy()
      datasets = {}
      datasets['targets'] = dataset[:, -1]
      datasets['features'] = dataset[:, :9]
    
      # Initialize model class obj
      model_class = LinearRegressionModel(fit_intercept=10)
      with mlflow.start_run(nested=True) as run:
        model_class.create_model()
        model_class.train(datasets)
        model_class.save(output_model_path)
        print(model_class.predict(datasets))
    
    
    train(train_df, './artifacts/output')
    
  3. Храните артефакты модели MLflow в ADLS или зарегистрируйте их в AML.

    # Store model MLFLOW artifacts in ADLS
    
    STORAGE_PATH = 'abfs[s]://<container>/<path-to-store-folder>'
    
    protocol, _ = split_protocol(STORAGE_PATH)
    print (protocol)
    
    storage_options = {
        'account_name': adls_account_name,
        'account_key': adls_account_key
    }
    fs = fsspec.filesystem(protocol, **storage_options)
    fs.put(
        './artifacts/output',
        STORAGE_PATH, 
        recursive=True, overwrite=True)
    
    # Register model MLFLOW artifacts in AML
    
    from azureml.core import Workspace, Model
    from azureml.core.authentication import ServicePrincipalAuthentication
    from notebookutils.mssparkutils import azureML
    
    AZURE_TENANT_ID = "xyz"
    AZURE_CLIENT_ID = "xyz"
    AZURE_CLIENT_SECRET = "xyz"
    
    AML_SUBSCRIPTION_ID = "xyz"
    AML_RESOURCE_GROUP = "xyz"
    AML_WORKSPACE_NAME = "xyz"
    
    #AML workspace authentication using linked service
    ws = azureML.getWorkspace("<linked_service_name>") #   "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked   service supports MSI and service principal both
    
    model = Model.register(
        model_path="./artifacts/output",
        model_name="xyz",
        workspace=ws,
    )
    
  4. Задайте необходимые параметры с помощью переменных.

    # If using ADLS uploaded model
    
    import pandas as pd
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
    DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv"
    ADLS_MODEL_URI_SKLEARN = "abfss://xyz@xyz.dfs.core.windows.net/mlflow/sklearn/     e2e_linear_regression/"
    RETURN_TYPES = "INT"
    RUNTIME = "mlflow"
    
    # If using AML registered model
    
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    from azureml.core import Workspace
    from azureml.core.authentication import ServicePrincipalAuthentication
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
    DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv"
    AML_MODEL_URI_SKLEARN = "aml://xyz"
    RETURN_TYPES = "INT"
    RUNTIME = "mlflow"
    
  5. Включите функции SynapseML PREDICT в сеансе Spark.

    spark.conf.set("spark.synapse.ml.predict.enabled","true")
    
  6. Привязка модели в сеансе Spark.

    # If using ADLS uploaded model
    
    model = pcontext.bind_model(
     return_types=RETURN_TYPES, 
     runtime=RUNTIME, 
     model_alias="sklearn_linear_regression",
     model_uri=ADLS_MODEL_URI_SKLEARN,
     ).register()
    
    # If using AML registered model
    
    model = pcontext.bind_model(
     return_types=RETURN_TYPES, 
     runtime=RUNTIME, 
     model_alias="sklearn_linear_regression",
     model_uri=AML_MODEL_URI_SKLEARN,
     aml_workspace=ws
     ).register()
    
  7. Загрузите данные для нагрузочного тестирования из ADLS.

    # Load data from ADLS
    
    df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .csv(DATA_FILE,
            inferSchema=True)
    df = df.select(df.columns[:9])
    df.createOrReplaceTempView('data')
    df.show(10)
    
  8. Вызовите PREDICT для генерации результата.

    # Call PREDICT
    
    predictions = spark.sql(
                      """
                          SELECT PREDICT('sklearn_linear_regression', *) AS predict FROM data
                      """
                  ).show()
    

Дальнейшие шаги