Spark işlerinde MLflow modellerini dağıtma ve çalıştırma
Bu makalede, büyük miktarda veriden çıkarım yapmak veya veri düzenleme işlerinin bir parçası olarak MLflow modelinizi Spark işlerinde dağıtmayı ve çalıştırmayı öğrenin.
Bu örnek hakkında
Bu örnekte, azure machine learning'de kayıtlı bir MLflow modelini yönetilen Spark kümelerinde (önizleme), Azure Databricks'te veya Azure Synapse Analytics'te çalıştırılan Spark işlerine dağıtarak büyük miktarda veriden çıkarım gerçekleştirebileceğiniz gösterilmektedir.
Model, UCI Kalp Hastalığı Veri Kümesini temel alır. Veritabanı 76 öznitelik içeriyor, ancak bunların 14'ünün alt kümesini kullanıyoruz. Model, bir hastada kalp hastalığının varlığını tahmin etmeye çalışır. 0 'dan (iletişim durumu yok) 1'e (iletişim durumu) değerlenen tamsayıdır. Sınıflandırıcı kullanılarak XGBBoost
eğitilmiştir ve gerekli tüm ön işleme işlem scikit-learn
hattı olarak paketlenmiştir ve bu da modeli ham verilerden tahminlere giden uçtan uca bir işlem hattı haline getirmiştir.
Bu makaledeki bilgiler, azureml-examples deposunda yer alan kod örneklerini temel alır. Dosyaları kopyalamak/yapıştırmak zorunda kalmadan komutları yerel olarak çalıştırmak için depoyu kopyalayın ve dizinleri olarak sdk/using-mlflow/deploy
değiştirin.
git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy
Önkoşullar
Bu makaledeki adımları takip etmeden önce aşağıdaki önkoşullara sahip olduğunuzdan emin olun:
MLflow SDK paketini
mlflow
ve MLflow için Azure Machine Learningazureml-mlflow
eklentisini yükleyin:pip install mlflow azureml-mlflow
İpucu
SQL depolama, sunucu, kullanıcı arabirimi veya veri bilimi bağımlılıkları olmadan basit bir MLflow paketi olan paketini kullanabilirsiniz
mlflow-skinny
.mlflow-skinny
dağıtımlar dahil olmak üzere tüm özellik paketini içeri aktarmadan öncelikle MLflow izleme ve günlüğe kaydetme özelliklerine ihtiyaç duyan kullanıcılar için önerilir.Azure Machine Learning çalışma alanı. Çalışma alanı oluşturmak için Makine öğrenmesi kaynakları oluşturma öğreticisine bakın. Çalışma alanınızda MLflow işlemlerinizi gerçekleştirmek için ihtiyacınız olan erişim izinlerini gözden geçirin.
Uzaktan izleme gerçekleştirirseniz (yani Azure Machine Learning dışında çalışan denemeleri izleyin), MLflow'ı Azure Machine Learning çalışma alanınızın izleme URI'sine işaret eden şekilde yapılandırın. MLflow'u çalışma alanınıza bağlama hakkında daha fazla bilgi için bkz . Azure Machine Learning için MLflow'u yapılandırma.
- Çalışma alanınızda kayıtlı bir MLflow modeli olmalıdır. Özellikle bu örnekte Diabetes veri kümesi için eğitilmiş bir model kaydedilecektir.
Çalışma alanınıza bağlanma
İlk olarak, modelinizin kayıtlı olduğu Azure Machine Learning çalışma alanına bağlanalım.
İzleme sizin için zaten yapılandırılmış. MLflow ile çalışırken varsayılan kimlik bilgileriniz de kullanılır.
Modeli kaydetme
Çıkarım gerçekleştirmek için Azure Machine Learning kayıt defterine kaydedilmiş bir modele ihtiyacımız var. Bu durumda, depoda modelin yerel bir kopyası zaten var, bu nedenle modeli yalnızca çalışma alanında kayıt defterinde yayımlamamız gerekiyor. Dağıtmaya çalıştığınız model zaten kayıtlıysa bu adımı atlayabilirsiniz.
model_name = 'heart-classifier'
model_local_path = "model"
registered_model = mlflow_client.create_model_version(
name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version
Alternatif olarak, modeliniz çalıştırmanın içinde günlüğe kaydedildiyse doğrudan kaydedebilirsiniz.
İpucu
Modeli kaydetmek için modelin depolandığı konumu bilmeniz gerekir. MLflow özelliğini kullanıyorsanız autolog
yol, kullanılan modelin türüne ve çerçevesine bağlıdır. Bu klasörün adını belirlemek için işlerin çıkışını denetlemenizi öneririz. adlı MLModel
bir dosya içeren klasörü arayabilirsiniz. kullanarak modellerinizi el ile log_model
günlüğe günlüğe geçiriyorsanız yol, bu tür bir yönteme geçirdiğiniz bağımsız değişkendir. Örneğin, kullanarak mlflow.sklearn.log_model(my_model, "classifier")
modeli günlüğe kaydederseniz, modelin depolandığı yol olur classifier
.
model_name = 'heart-classifier'
registered_model = mlflow_client.create_model_version(
name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version
Not
Yol MODEL_PATH
, modelin çalıştırmada depolandığı konumdur.
Puan almak için giriş verilerini alma
Üzerinde çalışmak veya işleri yapmak için bazı giriş verilerine ihtiyacımız olacak. Bu örnekte, örnek verileri İnternet'ten indireceğiz ve Spark kümesi tarafından kullanılan paylaşılan bir depolama alanına yerleştireceğiz.
import urllib
urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")
Verileri kümenin tamamında kullanılabilen bağlı bir depolama hesabına taşıyın.
dbutils.fs.mv("file:/tmp/data", "dbfs:/")
Önemli
Önceki kod, dbutils
Azure Databricks kümesinde kullanılabilen bir araçtır. Kullandığınız platforma bağlı olarak uygun aracı kullanın.
Giriş verileri daha sonra aşağıdaki klasöre yerleştirilir:
input_data_path = "dbfs:/data"
Spark kümelerinde modeli çalıştırma
Aşağıdaki bölümde, Spark işlerinde Azure Machine Learning'de kayıtlı MLflow modellerini çalıştırma işlemi açıklanmaktadır.
Kümede aşağıdaki kitaplıkların yüklü olduğundan emin olun:
- mlflow<3,>=2.1 - cloudpickle==2.2.0 - scikit-learn==1.2.0 - xgboost==1.7.2
Azure Machine Learning'de kayıtlı bir MLflow modeliyle puanlama yordamı oluşturmayı göstermek için bir not defteri kullanacağız. Not defteri oluşturun ve pyspark'ı varsayılan dil olarak kullanın.
Gerekli ad alanlarını içeri aktarın:
import mlflow import pyspark.sql.functions as f
Model URI'sini yapılandırın. Aşağıdaki URI, en son sürümünde adlı
heart-classifier
bir model getirir.model_uri = "models:/heart-classifier/latest"
Modeli UDF işlevi olarak yükleyin. Kullanıcı tanımlı işlev (UDF), kullanıcı tarafından tanımlanan ve özel mantığın kullanıcı ortamında yeniden kullanılmasına olanak sağlayan bir işlevdir.
predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double')
İpucu
İşlev tarafından döndürülen türü denetlemek için bağımsız değişkenini
result_type
predict()
kullanın.Puanlamak istediğiniz verileri okuyun:
df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
Bizim örneğimizde giriş verileri biçimindedir
CSV
ve klasörünedbfs:/data/
yerleştirilir. Bu veri kümesi tahmin etmek için hedef değişkeni içerdiğinden sütunutarget
da bırakıyoruz. Üretim senaryolarında verileriniz bu sütuna sahip olmaz.İşlevi
predict_function
çalıştırın ve tahminleri yeni bir sütuna yerleştirin. Bu örnekte tahminleri sütununapredictions
yerleştiriyoruz.df.withColumn("predictions", score_function(*df.columns))
İpucu
,
predict_function
gerekli sütunları bağımsız değişken olarak alır. Bizim örneğimizde, veri çerçevesinin tüm sütunları model tarafından beklenir ve bu nedenledf.columns
kullanılır. Modeliniz sütunların bir alt kümesini gerektiriyorsa bunları el ile tanıtabilirsiniz. Modeliniz bir imzaya sahipse, türlerin girişlerle beklenen türler arasında uyumlu olması gerekir.Tahminlerinizi depolama alanına geri yazabilirsiniz:
scored_data_path = "dbfs:/scored-data" scored_data.to_csv(scored_data_path)
Azure Machine Learning'de modeli tek başına bir Spark işinde çalıştırma
Azure Machine Learning, tek başına spark işi oluşturmayı ve Azure Machine Learning işlem hatlarında kullanılabilecek yeniden kullanılabilir bir Spark bileşeni oluşturmayı destekler. Bu örnekte, Azure Machine Learning tek başına Spark işinde çalışan ve çıkarım yapmak için bir MLflow modeli çalıştıran bir puanlama işi dağıtacağız.
Not
Azure Machine Learning'deki Spark işleri hakkında daha fazla bilgi edinmek için bkz . Azure Machine Learning'de Spark işleri gönderme (önizleme).
Spark işi, bağımsız değişkenleri alan bir Python betiği gerektirir. Puanlama betiği oluşturma:
score.py
import argparse parser = argparse.ArgumentParser() parser.add_argument("--model") parser.add_argument("--input_data") parser.add_argument("--scored_data") args = parser.parse_args() print(args.model) print(args.input_data) # Load the model as an UDF function predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda") # Read the data you want to score df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target") # Run the function `predict_function` and place the predictions on a new column scored_data = df.withColumn("predictions", score_function(*df.columns)) # Save the predictions scored_data.to_csv(args.scored_data)
Yukarıdaki betik üç bağımsız değişken
--model
--input_data
alır ve--scored_data
. İlk ikisi girişlerdir ve çalıştırmak istediğimiz modeli ve giriş verilerini temsil eder; sonuncusu bir çıkıştır ve tahminlerin yerleştirileceği çıkış klasörüdür.İpucu
Python paketlerinin yüklenmesi: Önceki puanlama betiği, MLflow modelini bir UDF işlevine yükler, ancak parametresini
env_manager="conda"
gösterir. Bu parametre ayarlandığında, MLflow yalnızca UDF işlevinin çalıştığı yalıtılmış bir ortamda model tanımında belirtilen gerekli paketleri geri yükler. Daha fazla ayrıntı için belgelere bakınmlflow.pyfunc.spark_udf
.İş tanımı oluşturma:
mlflow-score-spark-job.yml
$schema: http://azureml/sdk-2-0/SparkJob.json type: spark code: ./src entry: file: score.py conf: spark.driver.cores: 1 spark.driver.memory: 2g spark.executor.cores: 2 spark.executor.memory: 2g spark.executor.instances: 2 inputs: model: type: mlflow_model path: azureml:heart-classifier@latest input_data: type: uri_file path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv mode: direct outputs: scored_data: type: uri_folder args: >- --model ${{inputs.model}} --input_data ${{inputs.input_data}} --scored_data ${{outputs.scored_data}} identity: type: user_identity resources: instance_type: standard_e4s_v3 runtime_version: "3.2"
İpucu
Ekli bir Synapse Spark havuzu kullanmak için, özellik yerine
resources
yukarıda gösterilen örnek YAML belirtim dosyasında özelliği tanımlayıncompute
.Yukarıda gösterilen YAML dosyaları komutunda
az ml job create
parametresiyle birlikte--file
, gösterildiği gibi tek başına bir Spark işi oluşturmak için kullanılabilir:az ml job create -f mlflow-score-spark-job.yml