Mengintegrasikan MLflow dan Ray

MLflow adalah platform rekayasa AI sumber terbuka terbesar untuk agen, LLM, dan model ML. MLflow memungkinkan tim dari semua ukuran untuk men-debug, mengevaluasi, memantau, dan mengoptimalkan aplikasi AI mereka sambil mengontrol biaya dan mengelola akses ke model dan data. Dengan lebih dari 30 juta unduhan bulanan, ribuan organisasi mengandalkan MLflow setiap hari untuk mengirim AI ke produksi dengan percaya diri. Menggabungkan Ray dengan MLflow memungkinkan Anda mendistribusikan beban kerja dengan Ray dan melacak model, metrik, parameter, dan metadata yang dihasilkan selama pelatihan dengan MLflow.

Artikel ini membahas cara mengintegrasikan MLflow dengan komponen Ray berikut:

  • Ray Core: Aplikasi terdistribusi tujuan umum yang tidak tercakup oleh Ray Tune dan Ray Train
  • Ray Train: Pelatihan model terdistribusi
  • Ray Tune: Penyetelan hiperparameter terdistribusi

Mengintegrasikan Ray Core dan MLflow

Ray Core menyediakan blok bangunan dasar untuk aplikasi terdistribusi tujuan umum. Ini memungkinkan Anda untuk menskalakan fungsi dan kelas Python di beberapa simpul.

Bagian ini menjelaskan pola berikut untuk mengintegrasikan Ray Core dan MLflow:

  • Mencatat model MLflow dari proses driver Ray
  • Mencatat model MLflow dari eksekusi turunan

Log MLflow dari proses driver Ray

Umumnya sebaiknya mencatat model MLflow dari proses driver daripada dari node pekerja. Hal ini disebabkan oleh kompleksitas tambahan dalam meneruskan referensi stateful ke pekerja jarak jauh.

Misalnya, kode berikut mengalami kegagalan karena MLflow Tracking Server tidak diinisialisasi menggunakan MLflow Client dari dalam node pekerja.

import mlflow

@ray.remote
def example_logging_task(x):
# ...

 # This method will fail
 mlflow.log_metric("x", x)
 return x

with mlflow.start_run() as run:
 ray.get([example_logging_task.remote(x) for x in range(10)])

Sebagai gantinya, kembalikan metrik ke simpul driver. Metrik dan metadata umumnya cukup kecil untuk ditransfer kembali ke driver tanpa menyebabkan masalah memori.

Ambil contoh yang ditunjukkan di atas dan perbarui untuk mencatat metrik yang dikembalikan dari tugas Ray:

import mlflow

@ray.remote
def example_logging_task(x):
 # ...
 return x

with mlflow.start_run() as run:
  results = ray.get([example_logging_task.remote(x) for x in range(10)])
 for x in results:
   mlflow.log_metric("x", x)

Untuk tugas yang memerlukan penyimpanan artefak besar, seperti tabel Panda besar, gambar, plot, atau model, Databricks merekomendasikan untuk mempertahankan artefak sebagai file. Kemudian, muat ulang artefak dalam konteks driver atau langsung catat objek dengan MLflow dengan menentukan jalur ke file yang disimpan.

import mlflow

@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
  f.write(myLargeObject)
return x

with mlflow.start_run() as run:
 results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
  mlflow.log_metric("x", x)
  # Directly log the saved file by specifying the path
  mlflow.log_artifact("/dbfs/myLargeFilePath.txt")

Tugas Ray Log sebagai jalankan turunan MLflow

Anda dapat mengintegrasikan Ray Core dengan MLflow dengan menggunakan eksekusi turunan. Tindakan ini memerlukan langkah-langkah berikut:

  1. Membuat eksekusi induk: Menginisialisasi eksekusi induk dalam proses driver. Run ini berfungsi sebagai kontainer hierarkis untuk semua run anak berikutnya.
  2. Membuat eksekusi anak: Dalam setiap tugas Ray, mulai eksekusi anak di bawah eksekusi induk. Setiap anak yang dijalankan dapat mencatat metriknya sendiri secara independen.

Untuk menerapkan pendekatan ini, pastikan bahwa setiap tugas Ray menerima kredensial klien yang diperlukan dan induk run_id. Penyiapan ini menetapkan hubungan induk-anak hierarkis antara jalankan. Cuplikan kode berikut menunjukkan cara mengambil kredensial dan meneruskan elemen induk run_id:

from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"

mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
   import os
  # Set the MLflow credentials within the Ray task
   os.environ.update(mlflow_db_creds)
  # Set the active MLflow experiment within each Ray task
   mlflow.set_experiment(experiment_name)
  # Create nested child runs associated with the parent run_id
   with mlflow.start_run(run_id=run_id, nested=True):
    # Log metrics to the child run within the Ray task
       mlflow.log_metric("x", x)

  return x

# Start parent run on the main driver process
with mlflow.start_run() as run:
  # Pass the parent run's run_id to each Ray task
   results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Ray Train dan MLflow

Cara paling sederhana untuk mencatat model Ray Train ke MLflow adalah dengan menggunakan checkpoint yang dihasilkan oleh proses pelatihan. Setelah eksekusi pelatihan selesai, muat ulang model dalam kerangka kerja pembelajaran mendalam aslinya (seperti PyTorch atau TensorFlow), lalu catat dengan kode MLflow yang sesuai.

Pendekatan ini memastikan model disimpan dengan benar dan siap untuk evaluasi atau penyebaran.

Kode berikut memuat ulang model dari titik pemeriksaan Ray Train dan mencatatnya ke MLflow:

result = trainer.fit()

checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
     # Change as needed for different DL frameworks
    checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
    # Load the model from the checkpoint
    model = MyModel.load_from_checkpoint(checkpoint_path)

with mlflow.start_run() as run:
    # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

Meskipun umumnya merupakan praktik terbaik untuk mengirim objek kembali ke simpul driver, menyimpan hasil akhir dengan Ray Train lebih mudah daripada menyimpan seluruh riwayat pelatihan dari proses pekerja.

Untuk menyimpan beberapa model dari sesi pelatihan, tentukan jumlah checkpoint yang akan disimpan di ray.train.CheckpointConfig. Model kemudian dapat dibaca dan dicatat dengan cara yang sama seperti menyimpan satu model.

Catatan

MLflow tidak bertanggung jawab untuk menangani toleransi kesalahan selama pelatihan model melainkan untuk melacak siklus hidup model. Toleransi kesalahan malah dikelola oleh Ray Train itu sendiri.

Untuk menyimpan metrik pelatihan yang ditentukan oleh Ray Train, ambil dari objek hasil dan simpan menggunakan MLflow.

result = trainer.fit()

with mlflow.start_run() as run:
    mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))

  # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

Untuk mengonfigurasi kluster Spark dan Ray dengan benar dan mencegah masalah alokasi sumber daya, Anda harus menyesuaikan resources_per_worker pengaturan. Secara khusus, atur jumlah CPU untuk setiap pekerja Ray menjadi satu kurang dari jumlah total CPU yang tersedia pada simpul pekerja Ray. Penyesuaian ini sangat penting karena jika trainer mencadangkan semua inti yang tersedia untuk aktor Ray, itu dapat menyebabkan kesalahan perebutan sumber daya.

Ray Tune dan MLflow

Mengintegrasikan Ray Tune dengan MLflow memungkinkan Anda melacak dan mencatat eksperimen penyetelan hiperparameter secara efisien dalam Databricks. Integrasi ini memanfaatkan kemampuan pelacakan eksperimen MLflow untuk merekam metrik dan hasil langsung dari tugas Ray.

Pendekatan yang dijalankan anak-anak untuk pengelogan

Mirip dengan pengelogan dari tugas Ray Core, aplikasi Ray Tune dapat menggunakan pendekatan yang dijalankan anak untuk mencatat metrik dari setiap percobaan atau penyetelan iterasi. Gunakan langkah-langkah berikut untuk menerapkan pendekatan yang dikelola oleh anak.

  1. Membuat eksekusi induk: Menginisialisasi eksekusi induk dalam proses driver. Eksekusi ini berfungsi sebagai kontainer utama untuk semua eksekusi anak berikutnya.
  2. Dalam setiap tugas Ray Tune, log eksekusi anak dibuat menjadi bagian dari eksekusi induk, mempertahankan hierarki yang jelas dari hasil eksperimen.

Contoh berikut menunjukkan cara mengautentikasi dan mencatat log tugas Ray Tune dengan menggunakan MLflow.

import os
import tempfile
import time

import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars

from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow

mlflow_db_creds = get_databricks_env_vars("databricks")

EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)

def evaluation_fn(step, width, height):
   return (0.1 + width * step / 100) ** (-1) + height * 0.1

def train_function_mlflow(config, run_id):
   os.environ.update(mlflow_db_creds)
   mlflow.set_experiment(EXPERIMENT_NAME)

   # Hyperparameters
   width = config["width"]
   height = config["height"]

   with mlflow.start_run(run_id=run_id, nested=True):
       for step in range(config.get("steps", 100)):
           # Iterative training function - can be any arbitrary training procedure
           intermediate_score = evaluation_fn(step, width, height)
           # Log the metrics to MLflow
           mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
           # Feed the score back to Tune.
           train.report({"iterations": step, "mean_loss": intermediate_score})
           time.sleep(0.1)

def tune_with_setup(run_id, finish_fast=True):
   os.environ.update(mlflow_db_creds)
   # Set the experiment or create a new one if it does not exist.
   mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)

   tuner = tune.Tuner(
       tune.with_parameter(train_function_mlflow, run_id),
       tune_config=tune.TuneConfig(num_samples=5),
       run_config=train.RunConfig(
           name="mlflow",
       ),
       param_space={
           "width": tune.randint(10, 100),
           "height": tune.randint(0, 100),
           "steps": 20 if finish_fast else 100,
       },
   )
   results = tuner.fit()

with mlflow.start_run() as run:
   mlflow_tracking_uri = mlflow.get_tracking_uri()
   tune_with_setup(run.info.run_id)

Model Serving

Menggunakan Ray Serve pada kluster Databricks untuk inferensi real time menimbulkan tantangan karena keterbatasan keamanan dan konektivitas jaringan saat berinteraksi dengan aplikasi eksternal.

Databricks merekomendasikan penggunaan Model Serving untuk menyebarkan model pembelajaran mesin dalam produksi ke titik akhir REST API. Untuk informasi selengkapnya, lihat Gambaran umum model kustom.