Wdrażanie i uruchamianie modeli MLflow w zadaniach platformy Spark

W tym artykule dowiesz się, jak wdrożyć i uruchomić model MLflow w zadaniach platformy Spark w celu wnioskowania na dużych ilościach danych lub w ramach zadań uzdatniania danych.

Informacje o tym przykładzie

W tym przykładzie pokazano, jak wdrożyć model MLflow zarejestrowany w usłudze Azure Machine Edukacja w zadaniach platformy Spark uruchomionych w zarządzanych klastrach Spark (wersja zapoznawcza), usłudze Azure Databricks lub Azure Synapse Analytics w celu przeprowadzenia wnioskowania na dużych ilościach danych.

Model jest oparty na zestawie danych choroby serca UCI. Baza danych zawiera 76 atrybutów, ale używamy podzestawu 14 z nich. Model próbuje przewidzieć obecność choroby serca u pacjenta. Jest to liczba całkowita z zakresu od 0 (brak obecności) do 1 (obecność). Został on wytrenowany przy użyciu XGBBoost klasyfikatora, a wszystkie wymagane przetwarzanie wstępne zostało spakowane jako potok, dzięki czemu ten model jest potokiem kompleksowego scikit-learn , który przechodzi od nieprzetworzonych danych do przewidywań.

Informacje przedstawione w tym artykule są oparte na przykładach kodu zawartych w repozytorium azureml-examples . Aby uruchomić polecenia lokalnie bez konieczności kopiowania/wklejania plików, sklonuj repozytorium, a następnie zmień katalogi na sdk/using-mlflow/deploy.

git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy

Wymagania wstępne

Przed wykonaniem kroków opisanych w tym artykule upewnij się, że masz następujące wymagania wstępne:

  • Zainstaluj pakiet mlflow zestawu MLflow SDK i wtyczkę usługi Azure Machine Edukacja dla platformy MLflowazureml-mlflow.

    pip install mlflow azureml-mlflow
    

    Napiwek

    Możesz użyć mlflow-skinny pakietu, który jest lekkim pakietem MLflow bez magazynu SQL, serwera, interfejsu użytkownika lub zależności nauki o danych. mlflow-skinny jest zalecany dla użytkowników, którzy potrzebują głównie możliwości śledzenia i rejestrowania MLflow bez importowania pełnego zestawu funkcji, w tym wdrożeń.

  • Obszar roboczy usługi Azure Machine Learning. Możesz go utworzyć, wykonując czynności opisane w samouczku Tworzenie zasobów uczenia maszynowego.

  • Jeśli wykonujesz zdalne śledzenie (czyli śledzenie eksperymentów uruchomionych poza usługą Azure Machine Edukacja), skonfiguruj rozwiązanie MLflow, aby wskazywało identyfikator URI śledzenia obszaru roboczego usługi Azure Machine Edukacja. Aby uzyskać więcej informacji na temat łączenia platformy MLflow z obszarem roboczym, zobacz Configure MLflow for Azure Machine Edukacja (Konfigurowanie platformy MLflow dla usługi Azure Machine Edukacja).

  • Musisz mieć model MLflow zarejestrowany w obszarze roboczym. W szczególności w tym przykładzie zostanie zarejestrowany model przeszkolony dla zestawu danych Diabetes.

Nawiązywanie połączenia z obszarem roboczym

Najpierw nawiążmy połączenie z usługą Azure Machine Edukacja obszarem roboczym, w którym zarejestrowano model.

Śledzenie jest już skonfigurowane. Domyślne poświadczenia będą również używane podczas pracy z platformą MLflow.

Rejestrowanie modelu

Do wnioskowania potrzebujemy modelu zarejestrowanego w rejestrze usługi Azure Machine Edukacja. W tym przypadku mamy już lokalną kopię modelu w repozytorium, więc musimy opublikować model tylko w rejestrze w obszarze roboczym. Ten krok można pominąć, jeśli model, który próbujesz wdrożyć, jest już zarejestrowany.

model_name = 'heart-classifier'
model_local_path = "model"

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version

Alternatywnie, jeśli model został zarejestrowany wewnątrz przebiegu, możesz zarejestrować go bezpośrednio.

Napiwek

Aby zarejestrować model, musisz znać lokalizację, w której został zapisany model. Jeśli używasz autolog funkcji MLflow, ścieżka będzie zależeć od typu i struktury używanego modelu. Zalecamy sprawdzenie danych wyjściowych zadań w celu zidentyfikowania nazwy tego folderu. Możesz wyszukać folder zawierający plik o nazwie MLModel. Jeśli rejestrujesz modele ręcznie przy użyciu metody log_model, ścieżka jest argumentem przekazywanym do takiej metody. Jeśli na przykład rejestrujesz model przy użyciu mlflow.sklearn.log_model(my_model, "classifier")metody , ścieżka, w której jest przechowywany model, to classifier.

model_name = 'heart-classifier'

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version

Uwaga

Ścieżka MODEL_PATH to lokalizacja, w której model został zapisany w przebiegu.


Pobieranie danych wejściowych w celu oceny

Do uruchomienia lub zadań potrzebnych będzie kilka danych wejściowych. W tym przykładzie pobierzemy przykładowe dane z Internetu i umieścimy je w udostępnionym magazynie używanym przez klaster Spark.

import urllib

urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")

Przenieś dane do zainstalowanego konta magazynu dostępnego dla całego klastra.

dbutils.fs.mv("file:/tmp/data", "dbfs:/")

Ważne

W poprzednim kodzie jest używane dbutilsnarzędzie dostępne w klastrze usługi Azure Databricks. Użyj odpowiedniego narzędzia w zależności od używanej platformy.

Dane wejściowe są następnie umieszczane w następującym folderze:

input_data_path = "dbfs:/data"

Uruchamianie modelu w klastrach Spark

W poniższej sekcji wyjaśniono, jak uruchamiać modele MLflow zarejestrowane w usłudze Azure Machine Edukacja w zadaniach platformy Spark.

  1. Upewnij się, że w klastrze są zainstalowane następujące biblioteki:

    - mlflow<3,>=2.1
    - cloudpickle==2.2.0
    - scikit-learn==1.2.0
    - xgboost==1.7.2
    
  2. Użyjemy notesu, aby zademonstrować sposób tworzenia procedury oceniania za pomocą modelu MLflow zarejestrowanego w usłudze Azure Machine Edukacja. Utwórz notes i użyj narzędzia PySpark jako języka domyślnego.

  3. Zaimportuj wymagane przestrzenie nazw:

    import mlflow
    import pyspark.sql.functions as f
    
  4. Skonfiguruj identyfikator URI modelu. Poniższy identyfikator URI wprowadza model o nazwie heart-classifier w najnowszej wersji.

    model_uri = "models:/heart-classifier/latest"
    
  5. Załaduj model jako funkcję UDF. Funkcja zdefiniowana przez użytkownika (UDF) jest funkcją zdefiniowaną przez użytkownika, umożliwiając ponowne użycie logiki niestandardowej w środowisku użytkownika.

    predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double') 
    

    Napiwek

    Użyj argumentu result_type , aby kontrolować typ zwracany przez predict() funkcję.

  6. Przeczytaj dane, które chcesz ocenić:

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
    

    W naszym przypadku dane wejściowe są w CSV formacie i umieszczane w folderze dbfs:/data/. Usuwamy również kolumnę target , ponieważ ten zestaw danych zawiera zmienną docelową do przewidzenia. W scenariuszach produkcyjnych dane nie będą miały tej kolumny.

  7. Uruchom funkcję predict_function i umieść przewidywania w nowej kolumnie. W tym przypadku umieszczamy przewidywania w kolumnie predictions.

    df.withColumn("predictions", score_function(*df.columns))
    

    Napiwek

    Odebrane predict_function jako argumenty wymagane kolumny. W naszym przypadku wszystkie kolumny ramki danych są oczekiwane przez model i dlatego df.columns są używane. Jeśli model wymaga podzbioru kolumn, możesz wprowadzić je ręcznie. Jeśli model ma podpis, typy muszą być zgodne między danymi wejściowymi i oczekiwanymi typami.

  8. Przewidywania można zapisywać z powrotem do magazynu:

    scored_data_path = "dbfs:/scored-data"
    scored_data.to_csv(scored_data_path)
    

Uruchamianie modelu w autonomicznym zadaniu spark w usłudze Azure Machine Edukacja

Usługa Azure Machine Edukacja obsługuje tworzenie autonomicznego zadania platformy Spark oraz tworzenie składnika platformy Spark wielokrotnego użytku, który może być używany w potokach usługi Azure Machine Edukacja. W tym przykładzie wdrożymy zadanie oceniania uruchamiane w usłudze Azure Machine Edukacja autonomicznym zadaniu platformy Spark i uruchamiamy model MLflow w celu przeprowadzenia wnioskowania.

Uwaga

Aby dowiedzieć się więcej o zadaniach platformy Spark w usłudze Azure Machine Edukacja, zobacz Submit Spark jobs in Azure Machine Edukacja (wersja zapoznawcza).

  1. Zadanie platformy Spark wymaga skryptu języka Python, który przyjmuje argumenty. Utwórz skrypt oceniania:

    score.py

    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--model")
    parser.add_argument("--input_data")
    parser.add_argument("--scored_data")
    
    args = parser.parse_args()
    print(args.model)
    print(args.input_data)
    
    # Load the model as an UDF function
    predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda")
    
    # Read the data you want to score
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target")
    
    # Run the function `predict_function` and place the predictions on a new column
    scored_data = df.withColumn("predictions", score_function(*df.columns))
    
    # Save the predictions
    scored_data.to_csv(args.scored_data)
    

    Powyższy skrypt przyjmuje trzy argumenty --model, --input_data i --scored_data. Pierwsze dwa to dane wejściowe i reprezentują model, który chcemy uruchomić, a dane wejściowe, ostatni to dane wyjściowe i jest to folder wyjściowy, w którym zostaną umieszczone przewidywania.

    Napiwek

    Instalacja pakietów języka Python: poprzedni skrypt oceniania ładuje model MLflow do funkcji UDF, ale wskazuje parametr env_manager="conda". Po ustawieniu tego parametru narzędzie MLflow przywróci wymagane pakiety zgodnie z definicją modelu w izolowanym środowisku, w którym jest uruchamiana tylko funkcja UDF. Aby uzyskać więcej informacji, zobacz mlflow.pyfunc.spark_udf dokumentację.

  2. Utwórz definicję zadania:

    mlflow-score-spark-job.yml

    $schema: http://azureml/sdk-2-0/SparkJob.json
    type: spark
    
    code: ./src
    entry:
      file: score.py
    
    conf:
      spark.driver.cores: 1
      spark.driver.memory: 2g
      spark.executor.cores: 2
      spark.executor.memory: 2g
      spark.executor.instances: 2
    
    inputs:
      model:
        type: mlflow_model
        path: azureml:heart-classifier@latest
      input_data:
        type: uri_file
        path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv
        mode: direct
    
    outputs:
      scored_data:
        type: uri_folder
    
    args: >-
      --model ${{inputs.model}}
      --input_data ${{inputs.input_data}}
      --scored_data ${{outputs.scored_data}}
    
    identity:
      type: user_identity
    
    resources:
      instance_type: standard_e4s_v3
      runtime_version: "3.2"
    

    Napiwek

    Aby użyć dołączonej puli platformy Synapse Spark, zdefiniuj compute właściwość w przykładowym pliku specyfikacji YAML pokazanym powyżej zamiast resources właściwości.

  3. Pliki YAML pokazane powyżej mogą być używane w az ml job create poleceniu z parametrem --file w celu utworzenia autonomicznego zadania platformy Spark, jak pokazano poniżej:

    az ml job create -f mlflow-score-spark-job.yml
    

Następne kroki