Tutorial: pontuar modelos de machine learning com PREDICT em pools do Apache Spark sem servidor

Saiba como usar a funcionalidade PREDICT em pools do Apache Spark sem servidor no Azure Synapse Analytics para previsão de pontuação. Você pode usar um modelo treinado registrado no AML (Azure Machine Learning) ou no ADLS (Azure Data Lake Storage) padrão em seu workspace do Synapse.

A função PREDICT, em um notebook PySpark do Synapse, fornece a capacidade de pontuar modelos de machine learning usando a linguagem SQL, UDFs (funções definidas pelo usuário) ou transformadores. Com PREDICT, você pode resgatar seus modelos de machine learning existentes treinados fora do Synapse e registrados no Azure Data Lake Storage Gen2 ou no Azure Machine Learning para pontuar dados históricos dentro dos limites de segurança do Azure Synapse Analytics. A função PREDICT usa um modelo e dados como entradas. Esse recurso elimina a etapa de mover dados valiosos para fora do Synapse para pontuá-los. O objetivo é capacitar os consumidores de modelo para poderem inferir facilmente modelos de machine learning no Synapse, bem como colaborar perfeitamente com produtores de modelo trabalhando com a estrutura certa para a tarefa desses consumidores.

Neste tutorial, você aprenderá como:

  • Prever pontuações de dados em um pool do Apache Spark sem servidor usando modelos de machine learning treinados fora do Synapse e registrados no Azure Machine Learning ou no Azure Data Lake Storage Gen2.

Se você não tiver uma assinatura do Azure, crie uma conta gratuita antes de começar.

Pré-requisitos

  • Workspace do Azure Synapse Analytics com uma conta de armazenamento do Azure Data Lake Storage Gen2 configurada como o armazenamento padrão. Você precisa ser Colaborador de Dados do Storage Blob do sistema de arquivos Data Lake Storage Gen2 com o qual você trabalha.
  • Um pool do Apache Spark sem servidor no seu workspace do Azure Synapse Analytics. Para obter detalhes, confira Criar um Pool do Spark no Azure Synapse.
  • Um workspace do Azure Machine Learning será necessário se você quiser treinar ou registrar o modelo no Azure Machine Learning. Para obter detalhes, confira Gerenciar os workspaces do Azure Machine Learning no portal ou com o SDK do Python.
  • Se o seu modelo estiver registrado no Azure Machine Learning, você precisará de um serviço vinculado. No Azure Synapse Analytics, um serviço vinculado define as informações de conexão para o serviço. Neste tutorial, você adicionará um serviço vinculado do Azure Synapse Analytics e do Azure Machine Learning. Para saber mais, confira Criar um serviço vinculado do Azure Machine Learning no Azure Synapse.
  • A funcionalidade PREDICT requer que você já tenha um modelo treinado que seja registrado no Azure Machine Learning ou carregado no Azure Data Lake Storage Gen2.

Observação

  • O recurso PREDICT é compatível com o pool Spark3 do Apache Spark sem servidor no Azure Synapse Analytics. O Python 3.8 é uma versão recomendada para a criação e treinamento de modelos.
  • A função PREDICT dá suporte à maioria dos pacotes de modelos de machine learning no formato MLflow: TensorFlow, ONNX, PyTorch, SkLearn e pyfunc são compatíveis com esta versão prévia.
  • PREDICT dá suporte à origem do modelo do AML e do ADLS. Aqui, a conta do ADLS refere-se à conta padrão do ADLS do workspace do Synapse.

Entre no Portal do Azure

Entre no portal do Azure.

Usar PREDICT para modelos empacotados do MLFLOW

Verifique se todos os pré-requisitos foram atendidos antes de seguir estas etapas para o uso de PREDICT.

  1. Importar bibliotecas: importe as bibliotecas a seguir para usar PREDICT na sessão do Spark.

    #Import libraries
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    from azureml.core import Workspace
    from azureml.core.authentication import ServicePrincipalAuthentication
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
  2. Definir parâmetros usando variáveis: o caminho de dados de ADLS do Synapse e o URI do modelo precisam ser definidos usando variáveis de entrada. Você também precisa definir o runtime (que é "mlflow") e o tipo de dados de retorno de saída do modelo. Observe que todos os tipos de dados compatíveis com PySpark também são compatíveis por meio de PREDICT.

    Observação

    Antes de executar esse script, atualize-o com o URI para o arquivo de dados do ADLS Gen2, juntamente com o tipo de dados de retorno de saída de modelo e o URI de ADLS/AML para o arquivo de modelo.

    #Set input data path
    DATA_FILE = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<file path>"
    
    #Set model URI
        #Set AML URI, if trained model is registered in AML
           AML_MODEL_URI = "<aml model uri>" #In URI ":x" signifies model version in AML. You can   choose which model version you want to run. If ":x" is not provided then by default   latest version will be picked.
    
        #Set ADLS URI, if trained model is uploaded in ADLS
           ADLS_MODEL_URI = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<model   mlflow folder path>"
    
    #Define model return type
    RETURN_TYPES = "<data_type>" # for ex: int, float etc. PySpark data types are supported
    
    #Define model runtime. This supports only mlflow
    RUNTIME = "mlflow"
    
  3. Maneiras de autenticar um workspace do AML: se o modelo for armazenado na conta padrão do ADLS do workspace do Synapse, você não precisará de nenhuma configuração de autenticação adicional. Se o modelo estiver registrado no Azure Machine Learning, você poderá escolher uma das duas maneiras de autenticação compatíveis.

    Observação

    Atualize o locatário, o cliente, a assinatura, o grupo de recursos, o workspace do AML e os detalhes do serviço vinculado neste script antes de executá-lo.

    • (Recomendado) Por meio do serviço vinculado: você pode usar o serviço vinculado para autenticar no espaço de trabalho AML. O serviço vinculado pode usar "entidade de serviço" ou "Identidade de Serviço Gerenciada (MSI)" do espaço de trabalho Synapse para autenticação. "Entidade de serviço" ou "MSI (Identidade de Serviço Gerenciada)" precisa ter acesso de "Colaborador" ao workspace do AML.

      #AML workspace authentication using linked service
      from notebookutils.mssparkutils import azureML
      ws = azureML.getWorkspace("<linked_service_name>") #   "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked   service supports MSI and service principal both
      
    • Por meio da entidade de serviço: embora não seja recomendado, você pode usar a ID do cliente e o segredo da entidade de serviço diretamente para autenticar no workspace AML. Fornecer a senha da entidade de serviço diretamente representa algum risco de segurança, portanto, sugerimos usar um serviço vinculado sempre que possível. A entidade de serviço precisa ter acesso de "Colaborador" ao workspace do AML.

      #AML workspace authentication using service principal
      AZURE_TENANT_ID = "<tenant_id>"
      AZURE_CLIENT_ID = "<client_id>"
      AZURE_CLIENT_SECRET = "<client_secret>"
      
      AML_SUBSCRIPTION_ID = "<subscription_id>"
      AML_RESOURCE_GROUP = "<resource_group_name>"
      AML_WORKSPACE_NAME = "<aml_workspace_name>"
      
      svc_pr = ServicePrincipalAuthentication( 
           tenant_id=AZURE_TENANT_ID,
           service_principal_id=AZURE_CLIENT_ID,
           service_principal_password=AZURE_CLIENT_SECRET
      )
      
      ws = Workspace(
           workspace_name = AML_WORKSPACE_NAME,
           subscription_id = AML_SUBSCRIPTION_ID,
           resource_group = AML_RESOURCE_GROUP,
           auth=svc_pr
      )
      
  4. Habilitar PREDICT na sessão do Spark: defina a configuração spark.synapse.ml.predict.enabled do Spark como true para habilitar a biblioteca.

    #Enable SynapseML predict
    spark.conf.set("spark.synapse.ml.predict.enabled","true")
    
  5. Associar modelo na sessão do Spark: associe o modelo com entradas necessárias para que o modelo possa ser referenciado na sessão do Spark. Além disso, defina o alias para que você possa usar o mesmo alias na chamada a PREDICT.

    Observação

    Atualize o alias do modelo e o URI do modelo neste script antes de executá-lo.

    #Bind model within Spark session
     model = pcontext.bind_model(
      return_types=RETURN_TYPES, 
      runtime=RUNTIME, 
      model_alias="<random_alias_name>", #This alias will be used in PREDICT call to refer  this   model
      model_uri=ADLS_MODEL_URI, #In case of AML, it will be AML_MODEL_URI
      aml_workspace=ws #This is only for AML. In case of ADLS, this parameter can be removed
      ).register()
    
  6. Ler dados do ADLS: leia dados do ADLS. Crie um dataframe do Spark e uma exibição na parte superior desse dataframe.

    Observação

    Atualize o nome da exibição neste script antes de executá-lo.

    #Read data from ADLS
    df = spark.read \
     .format("csv") \
     .option("header", "true") \
     .csv(DATA_FILE,
         inferSchema=True)
    df.createOrReplaceTempView('<view_name>')
    
  7. Gerar pontuação usando PREDICT: você pode chamar PREDICT de três maneiras – usando a API de SQL do Spark, usando a UDF (função definida pelo usuário) e usando a API do transformador. Confira os exemplos a seguir.

    Observação

    Atualize o nome do alias do modelo, o nome da exibição e o nome da coluna de entradas do modelo separadas por vírgula neste script antes de executá-lo. As colunas de entradas de modelo separadas por vírgulas são as mesmas usadas durante o treinamento do modelo.

    #Call PREDICT using Spark SQL API
    
    predictions = spark.sql(
                   """
                       SELECT PREDICT('<random_alias_name>',
                                 <comma_separated_model_input_column_name>) AS predict 
                       FROM <view_name>
                   """
               ).show()
    
    #Call PREDICT using user defined function (UDF)
    
    df = df[<comma_separated_model_input_column_name>] # for ex. df["empid","empname"]
    
    df.withColumn("PREDICT",model.udf(lit("<random_alias_name>"),*df.columns)).show()
    
    #Call PREDICT using Transformer API
    
    columns = [<comma_separated_model_input_column_name>] # for ex. df["empid","empname"]
    
    transformer = model.create_transformer().setInputCols(columns).setOutputCol("PREDICT")
    
    transformer.transform(df).show()
    

Exemplo de Sklearn usando PREDICT

  1. Importe bibliotecas e leia o conjunto de dados de treinamento do ADLS.

    # Import libraries and read training dataset from ADLS
    
    import fsspec
    import pandas
    from fsspec.core import split_protocol
    
    adls_account_name = 'xyz' #Provide exact ADLS account name
    adls_account_key = 'xyz' #Provide exact ADLS account key
    
    fsspec_handle = fsspec.open('abfs[s]://<container>/<path-to-file>',      account_name=adls_account_name, account_key=adls_account_key)
    
    with fsspec_handle.open() as f:
        train_df = pandas.read_csv(f)
    
  2. Treine um modelo e gere artefatos mlflow.

    # Train model and generate mlflow artifacts
    
    import os
    import shutil
    import mlflow
    import json
    from mlflow.utils import model_utils
    import numpy as np
    import pandas as pd
    from sklearn.linear_model import LinearRegression
    
    
    class LinearRegressionModel():
      _ARGS_FILENAME = 'args.json'
      FEATURES_KEY = 'features'
      TARGETS_KEY = 'targets'
      TARGETS_PRED_KEY = 'targets_pred'
    
      def __init__(self, fit_intercept, nb_input_features=9, nb_output_features=1):
        self.fit_intercept = fit_intercept
        self.nb_input_features = nb_input_features
        self.nb_output_features = nb_output_features
    
      def get_args(self):
        args = {
            'nb_input_features': self.nb_input_features,
            'nb_output_features': self.nb_output_features,
            'fit_intercept': self.fit_intercept
        }
        return args
    
      def create_model(self):
        self.model = LinearRegression(fit_intercept=self.fit_intercept)
    
      def train(self, dataset):
    
        features = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.FEATURES_KEY])], axis=0)
    
        targets = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.TARGETS_KEY])], axis=0)
    
    
        self.model.fit(features, targets)
    
      def predict(self, dataset):
        features = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.FEATURES_KEY])], axis=0)
        targets_pred = self.model.predict(features)
        return targets_pred
    
      def save(self, path):
        if os.path.exists(path):
          shutil.rmtree(path)
    
        # save the sklearn model with mlflow
        mlflow.sklearn.save_model(self.model, path)
    
        # save args
        self._save_args(path)
    
      def _save_args(self, path):
        args_filename = os.path.join(path, LinearRegressionModel._ARGS_FILENAME)
        with open(args_filename, 'w') as f:
          args = self.get_args()
          json.dump(args, f)
    
    
    def train(train_df, output_model_path):
      print(f"Start to train LinearRegressionModel.")
    
      # Initialize input dataset
      dataset = train_df.to_numpy()
      datasets = {}
      datasets['targets'] = dataset[:, -1]
      datasets['features'] = dataset[:, :9]
    
      # Initialize model class obj
      model_class = LinearRegressionModel(fit_intercept=10)
      with mlflow.start_run(nested=True) as run:
        model_class.create_model()
        model_class.train(datasets)
        model_class.save(output_model_path)
        print(model_class.predict(datasets))
    
    
    train(train_df, './artifacts/output')
    
  3. Armazene artefatos MLFLOW de modelo no ADLS ou registre-os no AML.

    # Store model MLFLOW artifacts in ADLS
    
    STORAGE_PATH = 'abfs[s]://<container>/<path-to-store-folder>'
    
    protocol, _ = split_protocol(STORAGE_PATH)
    print (protocol)
    
    storage_options = {
        'account_name': adls_account_name,
        'account_key': adls_account_key
    }
    fs = fsspec.filesystem(protocol, **storage_options)
    fs.put(
        './artifacts/output',
        STORAGE_PATH, 
        recursive=True, overwrite=True)
    
    # Register model MLFLOW artifacts in AML
    
    from azureml.core import Workspace, Model
    from azureml.core.authentication import ServicePrincipalAuthentication
    from notebookutils.mssparkutils import azureML
    
    AZURE_TENANT_ID = "xyz"
    AZURE_CLIENT_ID = "xyz"
    AZURE_CLIENT_SECRET = "xyz"
    
    AML_SUBSCRIPTION_ID = "xyz"
    AML_RESOURCE_GROUP = "xyz"
    AML_WORKSPACE_NAME = "xyz"
    
    #AML workspace authentication using linked service
    ws = azureML.getWorkspace("<linked_service_name>") #   "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked   service supports MSI and service principal both
    
    model = Model.register(
        model_path="./artifacts/output",
        model_name="xyz",
        workspace=ws,
    )
    
  4. Defina os parâmetros necessários usando variáveis.

    # If using ADLS uploaded model
    
    import pandas as pd
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
    DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv"
    ADLS_MODEL_URI_SKLEARN = "abfss://xyz@xyz.dfs.core.windows.net/mlflow/sklearn/     e2e_linear_regression/"
    RETURN_TYPES = "INT"
    RUNTIME = "mlflow"
    
    # If using AML registered model
    
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    from azureml.core import Workspace
    from azureml.core.authentication import ServicePrincipalAuthentication
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
    DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv"
    AML_MODEL_URI_SKLEARN = "aml://xyz"
    RETURN_TYPES = "INT"
    RUNTIME = "mlflow"
    
  5. Habilite a funcionalidade PREDICT do SynapseML na sessão do Spark.

    spark.conf.set("spark.synapse.ml.predict.enabled","true")
    
  6. Associar modelo na sessão do Spark.

    # If using ADLS uploaded model
    
    model = pcontext.bind_model(
     return_types=RETURN_TYPES, 
     runtime=RUNTIME, 
     model_alias="sklearn_linear_regression",
     model_uri=ADLS_MODEL_URI_SKLEARN,
     ).register()
    
    # If using AML registered model
    
    model = pcontext.bind_model(
     return_types=RETURN_TYPES, 
     runtime=RUNTIME, 
     model_alias="sklearn_linear_regression",
     model_uri=AML_MODEL_URI_SKLEARN,
     aml_workspace=ws
     ).register()
    
  7. Carregar dados de teste do ADLS.

    # Load data from ADLS
    
    df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .csv(DATA_FILE,
            inferSchema=True)
    df = df.select(df.columns[:9])
    df.createOrReplaceTempView('data')
    df.show(10)
    
  8. Chame PREDICT para gerar a pontuação.

    # Call PREDICT
    
    predictions = spark.sql(
                      """
                          SELECT PREDICT('sklearn_linear_regression', *) AS predict FROM data
                      """
                  ).show()
    

Próximas etapas