Учебник. Оценка моделей машинного обучения с помощью PREDICT в бессерверных пулах Apache Spark

Узнайте, как использовать функциональность PREDICT в бессерверных пулах Apache Spark в Azure Synapse Analytics для прогнозирования оценки. Вы можете использовать обученную модель, зарегистрированную в Машинном обучении Azure (AML) или в Azure Data Lake Storage по умолчанию (ADLS) в рабочей области Synapse.

PREDICT в записной книжке Synapse PySpark позволяет оценить модели машинного обучения с помощью языка SQL, определяемых пользователем функций (UDF) или преобразователей. С помощью PREDICT можно перенести доступные модели машинного обучения, обученные за пределами Synapse и зарегистрированные в Azure Data Lake Storage 2-го поколения или Машинном обучении Azure, чтобы оценить исторические данные в пределах защищенных границ Azure Synapse Analytics. Функция PREDICT принимает в качестве входных данных модель и данные. Эта возможность исключает необходимость в перемещении ценных данных за пределы Synapse для оценки. Цель состоит в том, чтобы помочь потребителям модели легко вычислять модели машинного обучения в Synapse, а также легко сотрудничать с производителями моделей, использующими правильную платформу для своих задач.

Из этого руководства вы узнаете, как выполнять следующие задачи:

  • Прогноз оценок для данных в бессерверном пуле Apache Spark с помощью моделей машинного обучения, которые обучены за пределами Synapse и зарегистрированы в Машинном обучение Azure или Azure Data Lake Storage 2-го поколения.

Если у вас еще нет подписки Azure, создайте бесплатную учетную запись, прежде чем начинать работу.

Предварительные требования

  • Рабочая область Azure Synapse Analytics с учетной записью хранения Azure Data Lake Storage 2-го поколения, настроенной в качестве хранилища по умолчанию. При работе с файловой системой Data Lake Storage 2-го поколения вам нужно иметь права участника для получения данных Хранилища BLOB-объектов.
  • Бессерверный пул Apache Spark в рабочей области Azure Synapse Analytics. Дополнительные сведения см. в статье Создание пула Spark в Azure Synapse.
  • Для обучения или регистрации модели в Машинном обучение Azure необходима рабочая область Машинного обучения Azure. Дополнительные сведения см. в статье Управление рабочими областями Машинного обучения Azure на портале или с помощью пакета SDK для Python.
  • Если ваша модель зарегистрирована в Машинном обучение Azure, вам потребуется связанная служба. В Azure Synapse Analytics связанная служба определяет сведения о подключении к службе. В этом учебнике описывается, как добавить Azure Synapse Analytics и Машинное обучение Azure в качестве связанных служб. Дополнительные сведения см. в статье Создание связанной службы Машинного обучения Azure в Synapse.
  • Для работы функциональности PREDICT необходимо, чтобы у вас уже была обученная модель, которая либо зарегистрирована в Машинном обучении Azure, либо передана в Azure Data Lake Storage 2-го поколения.

Примечание

  • Функциональность PREDICT поддерживается в бессерверном пуле Apache Spark для Spark3 в Azure Synapse Analytics. Python 3.8 — рекомендуемая версия для создания и обучения моделей.
  • PREDICT поддерживает большинство пакетов моделей машинного обучения в формате MLflow: TensorFlow, ONNX, PyTorch, SkLearn и pyfunc поддерживаются в этой предварительной версии.
  • PREDICT поддерживает источник модели AML и ADLS. Здесь учетная запись ADLS ссылается на учетную запись ADLS рабочей области Synapse по умолчанию.

Вход на портал Azure

Войдите на портал Azure.

Использование PREDICT для упакованных моделей MLFLOW

Перед выполнением этих действий для использования PREDICT убедитесь, что выполнены все предварительные требования.

  1. Импорт библиотек. Импортируйте следующие библиотеки, чтобы использовать PREDICT в сеансе Spark.

    #Import libraries
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    from azureml.core import Workspace
    from azureml.core.authentication import ServicePrincipalAuthentication
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
  2. Установка параметров с помощью переменных. Путь к данным Synapse ADLS и универсальный код ресурса (URI) модели необходимо задать с помощью входных переменных. Кроме того, необходимо определить среду выполнения, которая является mlflow, и тип данных возвращаемого результата модели. Обратите внимание, что все типы данных, поддерживаемые в PySpark, также поддерживаются с помощью PREDICT.

    Примечание

    Перед выполнением этого скрипта обновите его с помощью URI для файла данных ADLS 2-го поколения вместе с типом выходных данных модели и URI ADLS/AML для файла модели.

    #Set input data path
    DATA_FILE = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<file path>"
    
    #Set model URI
        #Set AML URI, if trained model is registered in AML
           AML_MODEL_URI = "<aml model uri>" #In URI ":x" signifies model version in AML. You can   choose which model version you want to run. If ":x" is not provided then by default   latest version will be picked.
    
        #Set ADLS URI, if trained model is uploaded in ADLS
           ADLS_MODEL_URI = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<model   mlflow folder path>"
    
    #Define model return type
    RETURN_TYPES = "<data_type>" # for ex: int, float etc. PySpark data types are supported
    
    #Define model runtime. This supports only mlflow
    RUNTIME = "mlflow"
    
  3. Способы проверки подлинности рабочей области AML. Если модель хранится в учетной записи ADLS по умолчанию для рабочей области Synapse, то дальнейшая настройка проверки подлинности не требуется. Если модель зарегистрирована в Машинном обучении Azure, можно выбрать один из следующих двух поддерживаемых способов проверки подлинности.

    Примечание

    Перед запуском скрипта обновите сведения об арендаторе, клиенте, подписке, группе ресурсов, рабочей области AML и связанной службе в этом скрипте.

    • С помощью субъекта-службы. Идентификатор клиента субъекта-службы и секрет можно использовать непосредственно для проверки подлинности в рабочей области AML. Субъект-служба должен иметь доступ с правами участника к рабочей области AML.

      #AML workspace authentication using service principal
      AZURE_TENANT_ID = "<tenant_id>"
      AZURE_CLIENT_ID = "<client_id>"
      AZURE_CLIENT_SECRET = "<client_secret>"
      
      AML_SUBSCRIPTION_ID = "<subscription_id>"
      AML_RESOURCE_GROUP = "<resource_group_name>"
      AML_WORKSPACE_NAME = "<aml_workspace_name>"
      
      svc_pr = ServicePrincipalAuthentication( 
           tenant_id=AZURE_TENANT_ID,
           service_principal_id=AZURE_CLIENT_ID,
           service_principal_password=AZURE_CLIENT_SECRET
      )
      
      ws = Workspace(
           workspace_name = AML_WORKSPACE_NAME,
           subscription_id = AML_SUBSCRIPTION_ID,
           resource_group = AML_RESOURCE_GROUP,
           auth=svc_pr
      )
      
    • С помощью связанной службы. Для проверки подлинности в рабочей области AML можно использовать связанную службу. Для проверки подлинности связанная служба может использовать "субъект-службу" или "MSI" рабочей области Synapse. "Субъект-служба" или "MSI" должны иметь доступ с правами участника к рабочей области AML.

      #AML workspace authentication using linked service
      from notebookutils.mssparkutils import azureML
      ws = azureML.getWorkspace("<linked_service_name>") #   "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked   service supports MSI and service principal both
      
  4. Включение PREDICT в сеансе Spark. Для конфигурации Spark spark.synapse.ml.predict.enabled задайте значение true, чтобы включить библиотеку.

    #Enable SynapseML predict
    spark.conf.set("spark.synapse.ml.predict.enabled","true")
    
  5. Привязка модели в сеансе Spark. Привяжите модель с необходимыми входными данными, чтобы на модель можно было ссылаться в сеансе Spark. Определите также псевдоним, чтобы этот псевдоним можно было использовать в вызове PREDICT.

    Примечание

    Обновите псевдоним модели и URI модели в этом скрипте перед запуском.

    #Bind model within Spark session
     model = pcontext.bind_model(
      return_types=RETURN_TYPES, 
      runtime=RUNTIME, 
      model_alias="<random_alias_name>", #This alias will be used in PREDICT call to refer  this   model
      model_uri=ADLS_MODEL_URI, #In case of AML, it will be AML_MODEL_URI
      aml_workspace=ws #This is only for AML. In case of ADLS, this parameter can be removed
      ).register()
    
  6. Чтение данных из ADLS. Прочтите данные из ADLS. Создайте кадр данных Spark и представление поверх этого кадра.

    Примечание

    Обновите имя представления в этом скрипте перед его запуском.

    #Read data from ADLS
    df = spark.read \
     .format("csv") \
     .option("header", "true") \
     .csv(DATA_FILE,
         inferSchema=True)
    df.createOrReplaceTempView('<view_name>')
    
  7. Создание оценки с помощью PREDICT. Вы можете вызвать PREDICT тремя способами: с помощью API Spark SQL, UDF и API преобразователя. Ниже указаны примеры.

    Примечание

    Обновите имя псевдонима модели, имя представления и имя входного столбца модели с разделителями-запятыми в этом скрипте перед запуском. Входные столбцы модели с разделителями-запятыми совпадают с теми, которые используются при обучении модели.

    #Call PREDICT using Spark SQL API
    
    predictions = spark.sql(
                   """
                       SELECT PREDICT('<random_alias_name>',
                                 <comma_separated_model_input_column_name>) AS predict 
                       FROM <view_name>
                   """
               ).show()
    
    #Call PREDICT using user defined function (UDF)
    
    df = df[<comma_separated_model_input_column_name>] # for ex. df["empid","empname"]
    
    df.withColumn("PREDICT",model.udf(lit("<random_alias_name>"),*df.columns)).show()
    
    #Call PREDICT using Transformer API
    
    columns = [<comma_separated_model_input_column_name>] # for ex. df["empid","empname"]
    
    tranformer = model.create_transformer().setInputCols(columns).setOutputCol("PREDICT")
    
    tranformer.transform(df).show()
    

Пример Sklearn с использованием PREDICT

  1. Импортируйте библиотеки и прочтите набор данных для обучения из ADLS.

    # Import libraries and read training dataset from ADLS
    
    import fsspec
    import pandas
    from fsspec.core import split_protocol
    
    adls_account_name = 'xyz' #Provide exact ADLS account name
    adls_account_key = 'xyz' #Provide exact ADLS account key
    
    fsspec_handle = fsspec.open('abfs[s]://<container>/<path-to-file>',      account_name=adls_account_name, account_key=adls_account_key)
    
    with fsspec_handle.open() as f:
        train_df = pandas.read_csv(f)
    
  2. Обучение модели и создание артефактов mlflow.

    # Train model and generate mlflow artifacts
    
    import os
    import shutil
    import mlflow
    import json
    from mlflow.utils import model_utils
    import numpy as np
    import pandas as pd
    from sklearn.linear_model import LinearRegression
    
    
    class LinearRegressionModel():
      _ARGS_FILENAME = 'args.json'
      FEATURES_KEY = 'features'
      TARGETS_KEY = 'targets'
      TARGETS_PRED_KEY = 'targets_pred'
    
      def __init__(self, fit_intercept, nb_input_features=9, nb_output_features=1):
        self.fit_intercept = fit_intercept
        self.nb_input_features = nb_input_features
        self.nb_output_features = nb_output_features
    
      def get_args(self):
        args = {
            'nb_input_features': self.nb_input_features,
            'nb_output_features': self.nb_output_features,
            'fit_intercept': self.fit_intercept
        }
        return args
    
      def create_model(self):
        self.model = LinearRegression(fit_intercept=self.fit_intercept)
    
      def train(self, dataset):
    
        features = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.FEATURES_KEY])], axis=0)
    
        targets = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.TARGETS_KEY])], axis=0)
    
    
        self.model.fit(features, targets)
    
      def predict(self, dataset):
        features = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.FEATURES_KEY])], axis=0)
        targets_pred = self.model.predict(features)
        return targets_pred
    
      def save(self, path):
        if os.path.exists(path):
          shutil.rmtree(path)
    
        # save the sklearn model with mlflow
        mlflow.sklearn.save_model(self.model, path)
    
        # save args
        self._save_args(path)
    
      def _save_args(self, path):
        args_filename = os.path.join(path, LinearRegressionModel._ARGS_FILENAME)
        with open(args_filename, 'w') as f:
          args = self.get_args()
          json.dump(args, f)
    
    
    def train(train_df, output_model_path):
      print(f"Start to train LinearRegressionModel.")
    
      # Initialize input dataset
      dataset = train_df.to_numpy()
      datasets = {}
      datasets['targets'] = dataset[:, -1]
      datasets['features'] = dataset[:, :9]
    
      # Initialize model class obj
      model_class = LinearRegressionModel(fit_intercept=10)
      with mlflow.start_run(nested=True) as run:
        model_class.create_model()
        model_class.train(datasets)
        model_class.save(output_model_path)
        print(model_class.predict(datasets))
    
    
    train(train_df, './artifacts/output')
    
  3. Храните артефакты MLFLOW в ADLS или зарегистрируйтесь в AML.

    # Store model MLFLOW artifacts in ADLS
    
    STORAGE_PATH = 'abfs[s]://<container>/<path-to-store-folder>'
    
    protocol, _ = split_protocol(STORAGE_PATH)
    print (protocol)
    
    storage_options = {
        'account_name': adls_account_name,
        'account_key': adls_account_key
    }
    fs = fsspec.filesystem(protocol, **storage_options)
    fs.put(
        './artifacts/output',
        STORAGE_PATH, 
        recursive=True, overwrite=True)
    
    # Register model MLFLOW artifacts in AML
    
    from azureml.core import Workspace, Model
    from azureml.core.authentication import ServicePrincipalAuthentication
    
    AZURE_TENANT_ID = "xyz"
    AZURE_CLIENT_ID = "xyz"
    AZURE_CLIENT_SECRET = "xyz"
    
    AML_SUBSCRIPTION_ID = "xyz"
    AML_RESOURCE_GROUP = "xyz"
    AML_WORKSPACE_NAME = "xyz"
    
    svc_pr = ServicePrincipalAuthentication( 
        tenant_id=AZURE_TENANT_ID,
        service_principal_id=AZURE_CLIENT_ID,
        service_principal_password=AZURE_CLIENT_SECRET
    )
    
    ws = Workspace(
        workspace_name = AML_WORKSPACE_NAME,
        subscription_id = AML_SUBSCRIPTION_ID,
        resource_group = AML_RESOURCE_GROUP,
        auth=svc_pr
    )
    
    model = Model.register(
        model_path="./artifacts/output",
        model_name="xyz",
        workspace=ws,
    )
    
  4. Задайте необходимые параметры с помощью переменных.

    # If using ADLS uploaded model
    
    import pandas as pd
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
    DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv"
    ADLS_MODEL_URI_SKLEARN = "abfss://xyz@xyz.dfs.core.windows.net/mlflow/sklearn/     e2e_linear_regression/"
    RETURN_TYPES = "INT"
    RUNTIME = "mlflow"
    
    # If using AML registered model
    
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    from azureml.core import Workspace
    from azureml.core.authentication import ServicePrincipalAuthentication
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
    DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv"
    AML_MODEL_URI_SKLEARN = "aml://xyz"
    RETURN_TYPES = "INT"
    RUNTIME = "mlflow"
    
  5. Включите функциональность SynapseML PREDICT в сеансе Spark.

    spark.conf.set("spark.synapse.ml.predict.enabled","true")
    
  6. Привяжите модель в сеансе Spark.

    # If using ADLS uploaded model
    
    model = pcontext.bind_model(
     return_types=RETURN_TYPES, 
     runtime=RUNTIME, 
     model_alias="sklearn_linear_regression",
     model_uri=ADLS_MODEL_URI_SKLEARN,
     ).register()
    
    # If using AML registered model
    
    model = pcontext.bind_model(
     return_types=RETURN_TYPES, 
     runtime=RUNTIME, 
     model_alias="sklearn_linear_regression",
     model_uri=AML_MODEL_URI_SKLEARN,
     aml_workspace=ws
     ).register()
    
  7. Загрузите данные теста из ADLS.

    # Load data from ADLS
    
    df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .csv(DATA_FILE,
            inferSchema=True)
    df = df.select(df.columns[:9])
    df.createOrReplaceTempView('data')
    df.show(10)
    
  8. Вызовите PREDICT, чтобы создать оценку.

    # Call PREDICT
    
    predictions = spark.sql(
                      """
                          SELECT PREDICT('sklearn_linear_regression', *) AS predict FROM data
                      """
                  ).show()
    

Дальнейшие действия