Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Saiba como usar a funcionalidade PREDICT em pools do Apache Spark sem servidor no Azure Synapse Analytics para previsão de pontuação. Você pode usar um modelo treinado registrado no AML (Azure Machine Learning) ou no ADLS (Azure Data Lake Storage) padrão em seu workspace do Synapse.
A função PREDICT, em um notebook PySpark do Synapse, fornece a capacidade de pontuar modelos de machine learning usando a linguagem SQL, UDFs (funções definidas pelo usuário) ou transformadores. Com PREDICT, você pode resgatar seus modelos de machine learning existentes treinados fora do Synapse e registrados no Azure Data Lake Storage Gen2 ou no Azure Machine Learning para pontuar dados históricos dentro dos limites de segurança do Azure Synapse Analytics. A função PREDICT usa um modelo e dados como entradas. Esse recurso elimina a etapa de mover dados valiosos para fora do Synapse para pontuá-los. O objetivo é capacitar os consumidores de modelo para poderem inferir facilmente modelos de machine learning no Synapse, bem como colaborar perfeitamente com produtores de modelo trabalhando com a estrutura certa para a tarefa desses consumidores.
Neste tutorial, você aprenderá como:
- Prever pontuações de dados em um pool do Apache Spark sem servidor usando modelos de machine learning treinados fora do Synapse e registrados no Azure Machine Learning ou no Azure Data Lake Storage Gen2.
Se você não tiver uma assinatura do Azure, crie uma conta gratuita antes de começar.
Pré-requisitos
- Workspace do Azure Synapse Analytics com uma conta de armazenamento do Azure Data Lake Storage Gen2 configurada como o armazenamento padrão. Você precisa ser Colaborador de Dados do Storage Blob do sistema de arquivos Data Lake Storage Gen2 com o qual você trabalha.
- Um pool do Apache Spark sem servidor no seu workspace do Azure Synapse Analytics. Para obter detalhes, confira Criar um Pool do Spark no Azure Synapse.
- Um workspace do Azure Machine Learning será necessário se você quiser treinar ou registrar o modelo no Azure Machine Learning. Para obter detalhes, confira Gerenciar os workspaces do Azure Machine Learning no portal ou com o SDK do Python.
- Se o seu modelo estiver registrado no Azure Machine Learning, você precisará de um serviço vinculado. No Azure Synapse Analytics, um serviço vinculado define as informações de conexão para o serviço. Neste tutorial, você adicionará um serviço vinculado do Azure Synapse Analytics e do Azure Machine Learning. Para saber mais, confira Criar um serviço vinculado do Azure Machine Learning no Azure Synapse.
- A funcionalidade PREDICT requer que você já tenha um modelo treinado que seja registrado no Azure Machine Learning ou carregado no Azure Data Lake Storage Gen2.
Observação
- O recurso PREDICT é compatível com o pool Spark3 do Apache Spark sem servidor no Azure Synapse Analytics. O Python 3.8 é uma versão recomendada para a criação e treinamento de modelos.
- A função PREDICT dá suporte à maioria dos pacotes de modelos de machine learning no formato MLflow: TensorFlow, ONNX, PyTorch, SkLearn e pyfunc são compatíveis com esta versão prévia.
- PREDICT dá suporte à origem do modelo do AML e do ADLS. Aqui, a conta do ADLS refere-se à conta padrão do ADLS do workspace do Synapse.
Entre no Portal do Azure
Entre no portal do Azure.
Usar PREDICT para modelos empacotados do MLFLOW
Verifique se todos os pré-requisitos foram atendidos antes de seguir estas etapas para o uso de PREDICT.
Importar bibliotecas: importe as bibliotecas a seguir para usar PREDICT na sessão do Spark.
#Import libraries from pyspark.sql.functions import col, pandas_udf,udf,lit from azureml.core import Workspace from azureml.core.authentication import ServicePrincipalAuthentication import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_loggerDefinir parâmetros usando variáveis: o caminho de dados de ADLS do Synapse e o URI do modelo precisam ser definidos usando variáveis de entrada. Você também precisa definir o runtime (que é "mlflow") e o tipo de dados de retorno de saída do modelo. Observe que todos os tipos de dados compatíveis com PySpark também são compatíveis por meio de PREDICT.
Observação
Antes de executar esse script, atualize-o com o URI para o arquivo de dados do ADLS Gen2, juntamente com o tipo de dados de retorno de saída de modelo e o URI de ADLS/AML para o arquivo de modelo.
#Set input data path DATA_FILE = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<file path>" #Set model URI #Set AML URI, if trained model is registered in AML AML_MODEL_URI = "<aml model uri>" #In URI ":x" signifies model version in AML. You can choose which model version you want to run. If ":x" is not provided then by default latest version will be picked. #Set ADLS URI, if trained model is uploaded in ADLS ADLS_MODEL_URI = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<model mlflow folder path>" #Define model return type RETURN_TYPES = "<data_type>" # for ex: int, float etc. PySpark data types are supported #Define model runtime. This supports only mlflow RUNTIME = "mlflow"Maneiras de autenticar um workspace do AML: se o modelo for armazenado na conta padrão do ADLS do workspace do Synapse, você não precisará de nenhuma configuração de autenticação adicional. Se o modelo estiver registrado no Azure Machine Learning, você poderá escolher uma das duas maneiras de autenticação compatíveis.
Observação
Atualize o locatário, o cliente, a assinatura, o grupo de recursos, o workspace do AML e os detalhes do serviço vinculado neste script antes de executá-lo.
(Recomendado) Por meio do serviço vinculado: você pode usar o serviço vinculado para autenticar no espaço de trabalho AML. O serviço vinculado pode usar "entidade de serviço" ou "Identidade de Serviço Gerenciada (MSI)" do espaço de trabalho Synapse para autenticação. "Entidade de serviço" ou "MSI (Identidade de Serviço Gerenciada)" precisa ter acesso de "Colaborador" ao workspace do AML.
#AML workspace authentication using linked service from notebookutils.mssparkutils import azureML ws = azureML.getWorkspace("<linked_service_name>") # "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked service supports MSI and service principal bothPor meio da entidade de serviço: embora não seja recomendado, você pode usar a ID do cliente e o segredo da entidade de serviço diretamente para autenticar no workspace AML. Fornecer a senha da entidade de serviço diretamente representa algum risco de segurança, portanto, sugerimos usar um serviço vinculado sempre que possível. A entidade de serviço precisa ter acesso de "Colaborador" ao workspace do AML.
#AML workspace authentication using service principal AZURE_TENANT_ID = "<tenant_id>" AZURE_CLIENT_ID = "<client_id>" AZURE_CLIENT_SECRET = "<client_secret>" AML_SUBSCRIPTION_ID = "<subscription_id>" AML_RESOURCE_GROUP = "<resource_group_name>" AML_WORKSPACE_NAME = "<aml_workspace_name>" svc_pr = ServicePrincipalAuthentication( tenant_id=AZURE_TENANT_ID, service_principal_id=AZURE_CLIENT_ID, service_principal_password=AZURE_CLIENT_SECRET ) ws = Workspace( workspace_name = AML_WORKSPACE_NAME, subscription_id = AML_SUBSCRIPTION_ID, resource_group = AML_RESOURCE_GROUP, auth=svc_pr )
Habilitar PREDICT na sessão do Spark: defina a configuração
spark.synapse.ml.predict.enableddo Spark comotruepara habilitar a biblioteca.#Enable SynapseML predict spark.conf.set("spark.synapse.ml.predict.enabled","true")Associar modelo na sessão do Spark: associe o modelo com entradas necessárias para que o modelo possa ser referenciado na sessão do Spark. Além disso, defina o alias para que você possa usar o mesmo alias na chamada a PREDICT.
Observação
Atualize o alias do modelo e o URI do modelo neste script antes de executá-lo.
#Bind model within Spark session model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="<random_alias_name>", #This alias will be used in PREDICT call to refer this model model_uri=ADLS_MODEL_URI, #In case of AML, it will be AML_MODEL_URI aml_workspace=ws #This is only for AML. In case of ADLS, this parameter can be removed ).register()Ler dados do ADLS: leia dados do ADLS. Crie um dataframe do Spark e uma exibição na parte superior desse dataframe.
Observação
Atualize o nome da exibição neste script antes de executá-lo.
#Read data from ADLS df = spark.read \ .format("csv") \ .option("header", "true") \ .csv(DATA_FILE, inferSchema=True) df.createOrReplaceTempView('<view_name>')Gerar pontuação usando PREDICT: você pode chamar PREDICT de três maneiras – usando a API de SQL do Spark, usando a UDF (função definida pelo usuário) e usando a API do transformador. Confira os exemplos a seguir.
Observação
Atualize o nome do alias do modelo, o nome da exibição e o nome da coluna de entradas do modelo separadas por vírgula neste script antes de executá-lo. As colunas de entradas de modelo separadas por vírgulas são as mesmas usadas durante o treinamento do modelo.
#Call PREDICT using Spark SQL API predictions = spark.sql( """ SELECT PREDICT('<random_alias_name>', <comma_separated_model_input_column_name>) AS predict FROM <view_name> """ ).show()#Call PREDICT using user defined function (UDF) df = df[<comma_separated_model_input_column_name>] # for ex. df["empid","empname"] df.withColumn("PREDICT",model.udf(lit("<random_alias_name>"),*df.columns)).show()#Call PREDICT using Transformer API columns = [<comma_separated_model_input_column_name>] # for ex. df["empid","empname"] transformer = model.create_transformer().setInputCols(columns).setOutputCol("PREDICT") transformer.transform(df).show()
Exemplo de Sklearn usando PREDICT
Importe bibliotecas e leia o conjunto de dados de treinamento do ADLS.
# Import libraries and read training dataset from ADLS import fsspec import pandas from fsspec.core import split_protocol adls_account_name = 'xyz' #Provide exact ADLS account name adls_account_key = 'xyz' #Provide exact ADLS account key fsspec_handle = fsspec.open('abfs[s]://<container>/<path-to-file>', account_name=adls_account_name, account_key=adls_account_key) with fsspec_handle.open() as f: train_df = pandas.read_csv(f)Treine um modelo e gere artefatos mlflow.
# Train model and generate mlflow artifacts import os import shutil import mlflow import json from mlflow.utils import model_utils import numpy as np import pandas as pd from sklearn.linear_model import LinearRegression class LinearRegressionModel(): _ARGS_FILENAME = 'args.json' FEATURES_KEY = 'features' TARGETS_KEY = 'targets' TARGETS_PRED_KEY = 'targets_pred' def __init__(self, fit_intercept, nb_input_features=9, nb_output_features=1): self.fit_intercept = fit_intercept self.nb_input_features = nb_input_features self.nb_output_features = nb_output_features def get_args(self): args = { 'nb_input_features': self.nb_input_features, 'nb_output_features': self.nb_output_features, 'fit_intercept': self.fit_intercept } return args def create_model(self): self.model = LinearRegression(fit_intercept=self.fit_intercept) def train(self, dataset): features = np.stack([sample for sample in iter( dataset[LinearRegressionModel.FEATURES_KEY])], axis=0) targets = np.stack([sample for sample in iter( dataset[LinearRegressionModel.TARGETS_KEY])], axis=0) self.model.fit(features, targets) def predict(self, dataset): features = np.stack([sample for sample in iter( dataset[LinearRegressionModel.FEATURES_KEY])], axis=0) targets_pred = self.model.predict(features) return targets_pred def save(self, path): if os.path.exists(path): shutil.rmtree(path) # save the sklearn model with mlflow mlflow.sklearn.save_model(self.model, path) # save args self._save_args(path) def _save_args(self, path): args_filename = os.path.join(path, LinearRegressionModel._ARGS_FILENAME) with open(args_filename, 'w') as f: args = self.get_args() json.dump(args, f) def train(train_df, output_model_path): print(f"Start to train LinearRegressionModel.") # Initialize input dataset dataset = train_df.to_numpy() datasets = {} datasets['targets'] = dataset[:, -1] datasets['features'] = dataset[:, :9] # Initialize model class obj model_class = LinearRegressionModel(fit_intercept=10) with mlflow.start_run(nested=True) as run: model_class.create_model() model_class.train(datasets) model_class.save(output_model_path) print(model_class.predict(datasets)) train(train_df, './artifacts/output')Armazene artefatos MLFLOW de modelo no ADLS ou registre-os no AML.
# Store model MLFLOW artifacts in ADLS STORAGE_PATH = 'abfs[s]://<container>/<path-to-store-folder>' protocol, _ = split_protocol(STORAGE_PATH) print (protocol) storage_options = { 'account_name': adls_account_name, 'account_key': adls_account_key } fs = fsspec.filesystem(protocol, **storage_options) fs.put( './artifacts/output', STORAGE_PATH, recursive=True, overwrite=True)# Register model MLFLOW artifacts in AML from azureml.core import Workspace, Model from azureml.core.authentication import ServicePrincipalAuthentication from notebookutils.mssparkutils import azureML AZURE_TENANT_ID = "xyz" AZURE_CLIENT_ID = "xyz" AZURE_CLIENT_SECRET = "xyz" AML_SUBSCRIPTION_ID = "xyz" AML_RESOURCE_GROUP = "xyz" AML_WORKSPACE_NAME = "xyz" #AML workspace authentication using linked service ws = azureML.getWorkspace("<linked_service_name>") # "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked service supports MSI and service principal both model = Model.register( model_path="./artifacts/output", model_name="xyz", workspace=ws, )Defina os parâmetros necessários usando variáveis.
# If using ADLS uploaded model import pandas as pd from pyspark.sql import SparkSession from pyspark.sql.functions import col, pandas_udf,udf,lit import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_logger DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv" ADLS_MODEL_URI_SKLEARN = "abfss://xyz@xyz.dfs.core.windows.net/mlflow/sklearn/ e2e_linear_regression/" RETURN_TYPES = "INT" RUNTIME = "mlflow"# If using AML registered model from pyspark.sql.functions import col, pandas_udf,udf,lit from azureml.core import Workspace from azureml.core.authentication import ServicePrincipalAuthentication import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_logger DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv" AML_MODEL_URI_SKLEARN = "aml://xyz" RETURN_TYPES = "INT" RUNTIME = "mlflow"Habilite a funcionalidade PREDICT do SynapseML na sessão do Spark.
spark.conf.set("spark.synapse.ml.predict.enabled","true")Associar modelo na sessão do Spark.
# If using ADLS uploaded model model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="sklearn_linear_regression", model_uri=ADLS_MODEL_URI_SKLEARN, ).register()# If using AML registered model model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="sklearn_linear_regression", model_uri=AML_MODEL_URI_SKLEARN, aml_workspace=ws ).register()Carregar dados de teste do ADLS.
# Load data from ADLS df = spark.read \ .format("csv") \ .option("header", "true") \ .csv(DATA_FILE, inferSchema=True) df = df.select(df.columns[:9]) df.createOrReplaceTempView('data') df.show(10)Chame PREDICT para gerar a pontuação.
# Call PREDICT predictions = spark.sql( """ SELECT PREDICT('sklearn_linear_regression', *) AS predict FROM data """ ).show()