Sdílet prostřednictvím


Kurz: Skóre modelů strojového učení pomocí funkce PREDICT v bezserverových fondech Apache Sparku

Naučte se používat funkce PREDICT v bezserverových fondech Apache Sparku v Azure Synapse Analytics k predikci skóre. Vytrénovaný model zaregistrovaný ve službě Azure Machine Learning (AML) nebo ve výchozím Azure Data Lake Storage (ADLS) můžete použít v pracovním prostoru Synapse.

Funkce PREDICT v poznámkovém bloku Synapse PySpark poskytuje možnost ohodnotit modely strojového učení pomocí jazyka SQL, uživatelem definovaných funkcí (UDF) nebo Transformátorů. S funkcí PREDICT můžete přenést stávající modely strojového učení natrénované mimo Synapse a zaregistrované v Azure Data Lake Storage Gen2 nebo Azure Machine Learning a ohodnotit historická data v rámci zabezpečených hranic Azure Synapse Analytics. Funkce PREDICT přebírá model a data jako vstupy. Tato funkce eliminuje krok přesunu cenných dat mimo Synapse pro vyhodnocování. Cílem je umožnit uživatelům modelů snadno odvodit modely strojového učení v Synapse a bezproblémově spolupracovat s producenty modelů, kteří pracují se správnou architekturou pro jejich úlohu.

V tomto kurzu se naučíte:

  • Predikce skóre pro data v bezserverovém fondu Apache Sparku pomocí modelů strojového učení, které jsou natrénované mimo Synapse a zaregistrované ve službě Azure Machine Learning nebo Azure Data Lake Storage Gen2.

Pokud nemáte předplatné Azure, vytvořte si před zahájením bezplatného účtu.

Požadavky

  • Azure Synapse pracovní prostor Analytics s Azure Data Lake Storage Gen2 účtem úložiště nakonfigurovaným jako výchozí úložiště. Musíte být přispěvatelem dat objektů blob služby Storage Data Lake Storage Gen2 systému souborů, se kterým pracujete.
  • Bezserverový fond Apache Sparku v pracovním prostoru Azure Synapse Analytics. Podrobnosti najdete v tématu Vytvoření fondu Sparku v Azure Synapse.
  • Pracovní prostor Azure Machine Learning je potřeba, pokud chcete trénovat nebo registrovat model ve službě Azure Machine Learning. Podrobnosti najdete v tématu Správa pracovních prostorů Služby Azure Machine Learning na portálu nebo pomocí sady Python SDK.
  • Pokud je váš model zaregistrovaný ve službě Azure Machine Learning, potřebujete propojenou službu. V Azure Synapse Analytics definuje propojená služba informace o připojení ke službě. V tomto kurzu přidáte propojenou službu Azure Synapse Analytics a Azure Machine Learning. Další informace najdete v tématu Vytvoření nové propojené služby Azure Machine Learning v Synapse.
  • Funkce PREDICT vyžaduje, abyste už měli natrénovaný model, který je zaregistrovaný ve službě Azure Machine Learning nebo nahraný v Azure Data Lake Storage Gen2.

Poznámka

  • Funkce PREDICT je podporovaná v bezserverovém fondu Apache Sparku Spark3 v Azure Synapse Analytics. Python 3.8 je doporučená verze pro vytváření a trénování modelů.
  • Predict podporuje většinu balíčků modelů strojového učení ve formátu MLflow : Tato verze Preview podporuje TensorFlow, ONNX, PyTorch, SkLearn a pyfunc .
  • FUNKCE PREDICT podporuje zdroj modelů AML a ADLS . Tady účet ADLS odkazuje na výchozí účet ADLS pracovního prostoru Synapse.

Přihlášení k webu Azure Portal

Přihlaste se k webu Azure Portal.

Použití funkce PREDICT pro modely s balíčky MLFLOW

Před použitím funkce PREDICT se ujistěte, že jsou splněné všechny požadavky.

  1. Import knihoven: Pokud chcete v relaci Sparku použít funkci PREDICT, naimportujte následující knihovny.

    #Import libraries
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    from azureml.core import Workspace
    from azureml.core.authentication import ServicePrincipalAuthentication
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
  2. Nastavení parametrů pomocí proměnných: Cestu k datům Synapse ADLS a identifikátor URI modelu je potřeba nastavit pomocí vstupních proměnných. Musíte také definovat modul runtime, což je "mlflow" a datový typ návratu výstupu modelu. Upozorňujeme, že funkce PREDICT podporuje také všechny datové typy podporované v PySparku.

    Poznámka

    Před spuštěním tohoto skriptu ho aktualizujte pomocí identifikátoru URI pro datový soubor ADLS Gen2 spolu s výstupním datovým typem modelu a identifikátorem URI ADLS/AML pro soubor modelu.

    #Set input data path
    DATA_FILE = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<file path>"
    
    #Set model URI
        #Set AML URI, if trained model is registered in AML
           AML_MODEL_URI = "<aml model uri>" #In URI ":x" signifies model version in AML. You can   choose which model version you want to run. If ":x" is not provided then by default   latest version will be picked.
    
        #Set ADLS URI, if trained model is uploaded in ADLS
           ADLS_MODEL_URI = "abfss://<filesystemname>@<account name>.dfs.core.windows.net/<model   mlflow folder path>"
    
    #Define model return type
    RETURN_TYPES = "<data_type>" # for ex: int, float etc. PySpark data types are supported
    
    #Define model runtime. This supports only mlflow
    RUNTIME = "mlflow"
    
  3. Způsoby ověřování pracovního prostoru AML: Pokud je model uložený ve výchozím účtu ADLS pracovního prostoru Synapse, nepotřebujete žádné další nastavení ověřování. Pokud je model zaregistrovaný ve službě Azure Machine Learning, můžete zvolit některý z následujících dvou podporovaných způsobů ověřování.

    Poznámka

    Před spuštěním tohoto skriptu aktualizujte podrobnosti o tenantovi, klientovi, předplatném, skupině prostředků, pracovním prostoru AML a propojené službě.

    • Prostřednictvím instančního objektu: K ověření v pracovním prostoru AML můžete přímo použít ID klienta instančního objektu a tajný klíč. Instanční objekt musí mít přístup Přispěvatel k pracovnímu prostoru AML.

      #AML workspace authentication using service principal
      AZURE_TENANT_ID = "<tenant_id>"
      AZURE_CLIENT_ID = "<client_id>"
      AZURE_CLIENT_SECRET = "<client_secret>"
      
      AML_SUBSCRIPTION_ID = "<subscription_id>"
      AML_RESOURCE_GROUP = "<resource_group_name>"
      AML_WORKSPACE_NAME = "<aml_workspace_name>"
      
      svc_pr = ServicePrincipalAuthentication( 
           tenant_id=AZURE_TENANT_ID,
           service_principal_id=AZURE_CLIENT_ID,
           service_principal_password=AZURE_CLIENT_SECRET
      )
      
      ws = Workspace(
           workspace_name = AML_WORKSPACE_NAME,
           subscription_id = AML_SUBSCRIPTION_ID,
           resource_group = AML_RESOURCE_GROUP,
           auth=svc_pr
      )
      
    • Prostřednictvím propojené služby: K ověření v pracovním prostoru AML můžete použít propojenou službu. Propojená služba může k ověřování používat instanční objekt nebo identitu spravované služby (MSI) pracovního prostoru Synapse. Instanční objekt nebo identita spravované služby (MSI) musí mít přístup přispěvatele k pracovnímu prostoru AML.

      #AML workspace authentication using linked service
      from notebookutils.mssparkutils import azureML
      ws = azureML.getWorkspace("<linked_service_name>") #   "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked   service supports MSI and service principal both
      
  4. Povolení funkce PREDICT v relaci Sparku: Pokud chcete povolit knihovnu, nastavte konfiguraci spark.synapse.ml.predict.enabledtrue Sparku na hodnotu .

    #Enable SynapseML predict
    spark.conf.set("spark.synapse.ml.predict.enabled","true")
    
  5. Vytvoření vazby modelu v relaci Sparku: Vytvořte vazbu modelu s požadovanými vstupy, aby se na model mohl odkazovat v relaci Sparku. Definujte také alias, abyste mohli použít stejný alias ve volání PREDICT.

    Poznámka

    Před spuštěním tohoto skriptu aktualizujte alias modelu a identifikátor URI modelu.

    #Bind model within Spark session
     model = pcontext.bind_model(
      return_types=RETURN_TYPES, 
      runtime=RUNTIME, 
      model_alias="<random_alias_name>", #This alias will be used in PREDICT call to refer  this   model
      model_uri=ADLS_MODEL_URI, #In case of AML, it will be AML_MODEL_URI
      aml_workspace=ws #This is only for AML. In case of ADLS, this parameter can be removed
      ).register()
    
  6. Čtení dat z ADLS: Čtení dat z ADLS Vytvořte datový rámec Sparku a zobrazení nad datovým rámcem.

    Poznámka

    Před spuštěním tohoto skriptu aktualizujte název zobrazení.

    #Read data from ADLS
    df = spark.read \
     .format("csv") \
     .option("header", "true") \
     .csv(DATA_FILE,
         inferSchema=True)
    df.createOrReplaceTempView('<view_name>')
    
  7. Generování skóre pomocí funkce PREDICT: Funkci PREDICT můžete volat třemi způsoby: pomocí rozhraní SPARK SQL API, pomocí funkce UDF (User define Function) a pomocí rozhraní Transformer API. Tady jsou příklady.

    Poznámka

    Před spuštěním tohoto skriptu aktualizujte název aliasu modelu, název zobrazení a název vstupního sloupce modelu oddělený čárkami. Vstupní sloupce modelu oddělené čárkami jsou stejné jako sloupce použité při trénování modelu.

    #Call PREDICT using Spark SQL API
    
    predictions = spark.sql(
                   """
                       SELECT PREDICT('<random_alias_name>',
                                 <comma_separated_model_input_column_name>) AS predict 
                       FROM <view_name>
                   """
               ).show()
    
    #Call PREDICT using user defined function (UDF)
    
    df = df[<comma_separated_model_input_column_name>] # for ex. df["empid","empname"]
    
    df.withColumn("PREDICT",model.udf(lit("<random_alias_name>"),*df.columns)).show()
    
    #Call PREDICT using Transformer API
    
    columns = [<comma_separated_model_input_column_name>] # for ex. df["empid","empname"]
    
    tranformer = model.create_transformer().setInputCols(columns).setOutputCol("PREDICT")
    
    tranformer.transform(df).show()
    

Příklad Sklearnu s využitím funkce PREDICT

  1. Importujte knihovny a přečtěte si trénovací datovou sadu z ADLS.

    # Import libraries and read training dataset from ADLS
    
    import fsspec
    import pandas
    from fsspec.core import split_protocol
    
    adls_account_name = 'xyz' #Provide exact ADLS account name
    adls_account_key = 'xyz' #Provide exact ADLS account key
    
    fsspec_handle = fsspec.open('abfs[s]://<container>/<path-to-file>',      account_name=adls_account_name, account_key=adls_account_key)
    
    with fsspec_handle.open() as f:
        train_df = pandas.read_csv(f)
    
  2. Trénování modelu a generování artefaktů mlflow

    # Train model and generate mlflow artifacts
    
    import os
    import shutil
    import mlflow
    import json
    from mlflow.utils import model_utils
    import numpy as np
    import pandas as pd
    from sklearn.linear_model import LinearRegression
    
    
    class LinearRegressionModel():
      _ARGS_FILENAME = 'args.json'
      FEATURES_KEY = 'features'
      TARGETS_KEY = 'targets'
      TARGETS_PRED_KEY = 'targets_pred'
    
      def __init__(self, fit_intercept, nb_input_features=9, nb_output_features=1):
        self.fit_intercept = fit_intercept
        self.nb_input_features = nb_input_features
        self.nb_output_features = nb_output_features
    
      def get_args(self):
        args = {
            'nb_input_features': self.nb_input_features,
            'nb_output_features': self.nb_output_features,
            'fit_intercept': self.fit_intercept
        }
        return args
    
      def create_model(self):
        self.model = LinearRegression(fit_intercept=self.fit_intercept)
    
      def train(self, dataset):
    
        features = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.FEATURES_KEY])], axis=0)
    
        targets = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.TARGETS_KEY])], axis=0)
    
    
        self.model.fit(features, targets)
    
      def predict(self, dataset):
        features = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.FEATURES_KEY])], axis=0)
        targets_pred = self.model.predict(features)
        return targets_pred
    
      def save(self, path):
        if os.path.exists(path):
          shutil.rmtree(path)
    
        # save the sklearn model with mlflow
        mlflow.sklearn.save_model(self.model, path)
    
        # save args
        self._save_args(path)
    
      def _save_args(self, path):
        args_filename = os.path.join(path, LinearRegressionModel._ARGS_FILENAME)
        with open(args_filename, 'w') as f:
          args = self.get_args()
          json.dump(args, f)
    
    
    def train(train_df, output_model_path):
      print(f"Start to train LinearRegressionModel.")
    
      # Initialize input dataset
      dataset = train_df.to_numpy()
      datasets = {}
      datasets['targets'] = dataset[:, -1]
      datasets['features'] = dataset[:, :9]
    
      # Initialize model class obj
      model_class = LinearRegressionModel(fit_intercept=10)
      with mlflow.start_run(nested=True) as run:
        model_class.create_model()
        model_class.train(datasets)
        model_class.save(output_model_path)
        print(model_class.predict(datasets))
    
    
    train(train_df, './artifacts/output')
    
  3. Uložte artefakty MLFLOW modelu v ADLS nebo zaregistrujte v AML.

    # Store model MLFLOW artifacts in ADLS
    
    STORAGE_PATH = 'abfs[s]://<container>/<path-to-store-folder>'
    
    protocol, _ = split_protocol(STORAGE_PATH)
    print (protocol)
    
    storage_options = {
        'account_name': adls_account_name,
        'account_key': adls_account_key
    }
    fs = fsspec.filesystem(protocol, **storage_options)
    fs.put(
        './artifacts/output',
        STORAGE_PATH, 
        recursive=True, overwrite=True)
    
    # Register model MLFLOW artifacts in AML
    
    from azureml.core import Workspace, Model
    from azureml.core.authentication import ServicePrincipalAuthentication
    
    AZURE_TENANT_ID = "xyz"
    AZURE_CLIENT_ID = "xyz"
    AZURE_CLIENT_SECRET = "xyz"
    
    AML_SUBSCRIPTION_ID = "xyz"
    AML_RESOURCE_GROUP = "xyz"
    AML_WORKSPACE_NAME = "xyz"
    
    svc_pr = ServicePrincipalAuthentication( 
        tenant_id=AZURE_TENANT_ID,
        service_principal_id=AZURE_CLIENT_ID,
        service_principal_password=AZURE_CLIENT_SECRET
    )
    
    ws = Workspace(
        workspace_name = AML_WORKSPACE_NAME,
        subscription_id = AML_SUBSCRIPTION_ID,
        resource_group = AML_RESOURCE_GROUP,
        auth=svc_pr
    )
    
    model = Model.register(
        model_path="./artifacts/output",
        model_name="xyz",
        workspace=ws,
    )
    
  4. Nastavte požadované parametry pomocí proměnných.

    # If using ADLS uploaded model
    
    import pandas as pd
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
    DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv"
    ADLS_MODEL_URI_SKLEARN = "abfss://xyz@xyz.dfs.core.windows.net/mlflow/sklearn/     e2e_linear_regression/"
    RETURN_TYPES = "INT"
    RUNTIME = "mlflow"
    
    # If using AML registered model
    
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    from azureml.core import Workspace
    from azureml.core.authentication import ServicePrincipalAuthentication
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
    DATA_FILE = "abfss://xyz@xyz.dfs.core.windows.net/xyz.csv"
    AML_MODEL_URI_SKLEARN = "aml://xyz"
    RETURN_TYPES = "INT"
    RUNTIME = "mlflow"
    
  5. Povolte funkci SynapseML PREDICT v relaci Sparku.

    spark.conf.set("spark.synapse.ml.predict.enabled","true")
    
  6. Vytvoření vazby modelu v relaci Sparku

    # If using ADLS uploaded model
    
    model = pcontext.bind_model(
     return_types=RETURN_TYPES, 
     runtime=RUNTIME, 
     model_alias="sklearn_linear_regression",
     model_uri=ADLS_MODEL_URI_SKLEARN,
     ).register()
    
    # If using AML registered model
    
    model = pcontext.bind_model(
     return_types=RETURN_TYPES, 
     runtime=RUNTIME, 
     model_alias="sklearn_linear_regression",
     model_uri=AML_MODEL_URI_SKLEARN,
     aml_workspace=ws
     ).register()
    
  7. Zátěžová testovací data z ADLS.

    # Load data from ADLS
    
    df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .csv(DATA_FILE,
            inferSchema=True)
    df = df.select(df.columns[:9])
    df.createOrReplaceTempView('data')
    df.show(10)
    
  8. Zavolejte PREDICT a vygenerujte skóre.

    # Call PREDICT
    
    predictions = spark.sql(
                      """
                          SELECT PREDICT('sklearn_linear_regression', *) AS predict FROM data
                      """
                  ).show()
    

Další kroky