Bagikan melalui


Membuat dan menyambungkan ke kluster Ray di Azure Databricks

Pelajari cara membuat, mengonfigurasi, dan menjalankan kluster komputasi Ray di Azure Databricks

Persyaratan

Untuk membuat kluster Ray, Anda harus memiliki akses ke sumber daya komputasi serbaguna Databricks dengan pengaturan berikut:

  • Databricks Runtime 12.2 LTS ML ke atas.
  • Mode akses harus pengguna tunggal atau Tidak ada isolasi yang dibagikan.

Catatan

Kluster ray saat ini tidak didukung pada komputasi tanpa server.

Pasang Ray

Dengan Databricks Runtime ML 15.0 dan seterusnya, Ray telah diinstal sebelumnya pada kluster Azure Databricks.

Untuk runtime yang dirilis sebelum 15.0, gunakan pip untuk menginstal Ray di kluster Anda:

%pip install ray[default]>=2.3.0

Membuat kluster Ray khusus pengguna di kluster Azure Databricks

Untuk membuat kluster Ray, gunakan API ray.util.spark.setup_ray_cluster .

Catatan

Saat Anda membuat kluster Ray di buku catatan, kluster tersebut hanya tersedia untuk pengguna buku catatan saat ini. Kluster Ray secara otomatis dimatikan setelah notebook terlepas dari kluster atau setelah 30 menit tidak aktif (tidak ada tugas yang dikirimkan ke Ray). Jika Anda ingin membuat kluster Ray yang dibagikan dengan semua pengguna dan tidak tunduk pada notebook yang berjalan secara aktif, gunakan API sebagai gantinya ray.util.spark.setup_global_ray_cluster .

Kluster Ray ukuran tetap

Di notebook Azure Databricks apa pun yang dilampirkan ke kluster Azure Databricks, Anda dapat menjalankan perintah berikut untuk memulai kluster Ray ukuran tetap:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Kluster Sinar Penskalakan Otomatis

Untuk mempelajari cara memulai kluster Ray auto-scaling, lihat Kluster Scale Ray di Azure Databricks.

Memulai kluster Ray mode global

Dengan menggunakan Ray 2.9.0 ke atas, Anda dapat membuat kluster Ray mode global pada kluster Azure Databricks. Kluster Ray mode global memungkinkan semua pengguna yang melekat pada kluster Azure Databricks untuk juga menggunakan kluster Ray. Mode menjalankan kluster Ray ini tidak memiliki fungsionalitas batas waktu aktif yang dimiliki kluster pengguna tunggal saat menjalankan instans kluster Ray pengguna tunggal.

Untuk memulai kluster sinar global yang dapat dilampirkan dan dijalankan oleh beberapa pengguna, mulailah dengan membuat pekerjaan notebook Azure Databricks dan lampirkan ke kluster Azure Databricks mode bersama, lalu jalankan perintah berikut:

from ray.util.spark import setup_global_ray_cluster

setup_global_ray_cluster(
  max_worker_nodes=2,
  ...
  # other arguments are the same as with the `setup_global_ray` API.
)

Ini adalah panggilan pemblokiran yang akan tetap aktif sampai Anda mengganggu panggilan dengan mengklik tombol "Interupsi" pada sel perintah notebook, melepaskan notebook dari kluster Azure Databricks, atau mengakhiri kluster Azure Databricks. Jika tidak, kluster Ray mode global akan terus berjalan dan tersedia untuk pengiriman tugas oleh pengguna yang berwenang. Untuk informasi selengkapnya tentang kluster mode global, lihat Dokumentasi Ray API.

Kluster mode global memiliki properti berikut:

  • Dalam kluster Azure Databricks, Anda hanya dapat membuat satu kluster Ray mode global aktif sekaligus.
  • Dalam kluster Azure Databricks, kluster Ray mode global aktif dapat digunakan oleh semua pengguna di notebook Azure Databricks yang dilampirkan. Anda dapat menjalankan ray.init() untuk menyambungkan ke kluster Ray mode global aktif. Karena beberapa pengguna dapat mengakses kluster Ray ini, ketidakcocokan sumber daya mungkin menjadi masalah.
  • Kluster Ray mode global aktif hingga setup_ray_cluster panggilan terganggu. Ini tidak memiliki batas waktu mati otomatis seperti yang dilakukan kluster Ray pengguna tunggal.

Membuat kluster Ray GPU

Untuk kluster GPU, sumber daya ini dapat ditambahkan ke kluster Ray dengan cara berikut:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=4,
  num_cpus_per_node=8,
  num_gpus_per_node=1,
  num_cpus_head_node=8,
  num_gpus_head_node=1,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Menyambungkan ke kluster Ray jarak jauh menggunakan klien Ray

Di Ray versi 2.3.0 ke atas, Anda dapat membuat kluster Ray menggunakan API setup_ray_cluster, dan di notebook yang sama, Anda dapat memanggil ray.init() API untuk terhubung ke kluster Ray ini. Untuk mendapatkan string koneksi jarak jauh, gunakan hal berikut:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Kemudian, Anda dapat menyambungkan kluster jarak jauh menggunakan string koneksi jarak jauh di atas:

import ray
ray.init(remote_conn_str)

Klien Ray tidak mendukung API himpunan data Ray yang ditentukan dalam modul ray.data. Sebagai solusinya, Anda dapat membungkus kode yang memanggil API himpunan data Ray di dalam tugas Ray jarak jauh, seperti yang ditunjukkan dalam kode berikut:

import ray
import pandas as pd

# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI

For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:

``` shell
ray job submit  --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py

Nilai yang perlu dikonfigurasi adalah URL ruang kerja Azure Databricks, dimulai dengan https://, dan kemudian nilai yang ditemukan setelah /driver-proxy/o/ ditemukan di URL proksi Dasbor Ray yang ditampilkan setelah kluster Ray dimulai.

Ray Job CLI digunakan untuk mengirimkan pekerjaan ke kluster Ray dari sistem eksternal tetapi tidak diperlukan untuk mengirimkan pekerjaan pada kluster Ray di Azure Databricks. Disarankan agar pekerjaan disebarkan menggunakan Pekerjaan Azure Databricks, kluster Ray per aplikasi dibuat, dan alat Azure Databricks yang ada, seperti Bundel Aset Azure Databricks atau Pemicu Alur Kerja, digunakan untuk memicu pekerjaan.

Mengatur lokasi output log

Anda dapat mengatur argumen collect_log_to_path untuk menentukan jalur tujuan tempat Anda ingin mengumpulkan log kluster Ray. Pengumpulan log berjalan setelah kluster Ray dimatikan.

Azure Databricks merekomendasikan pengaturan jalur yang dimulai dengan /dbfs/ atau jalur Volume Katalog Unity untuk mempertahankan log bahkan jika Anda mengakhiri kluster Apache Spark. Jika tidak, log Anda tidak dapat dipulihkan karena penyimpanan lokal pada kluster dihapus saat kluster dimatikan.

Setelah membuat kluster Ray, Anda dapat menjalankan kode aplikasi Ray apa pun langsung di notebook Anda. Klik Buka Dasbor Kluster Ray di tab baru untuk melihat dasbor Ray untuk kluster.

Aktifkan jejak tumpukan dan grafik api di halaman Ray Dashboard Actors

Pada halaman Ray Dashboard Actors, Anda dapat melihat jejak tumpukan dan grafik api untuk aktor Ray aktif. Untuk melihat informasi ini, gunakan perintah berikut untuk menginstal py-spy sebelum Anda memulai kluster Ray:

%pip install py-spy

Membuat dan mengonfigurasi praktik terbaik

Bagian ini mencakup praktik terbaik untuk membuat dan mengonfigurasi kluster Ray.

Beban kerja non-GPU

Kluster Ray berjalan di atas kluster Azure Databricks Spark. Skenario umumnya adalah menggunakan pekerjaan Spark dan Spark UDF untuk melakukan tugas praprosem data sederhana yang tidak memerlukan sumber daya GPU. Kemudian, gunakan Ray untuk menjalankan tugas pembelajaran mesin rumit yang mendapat manfaat dari GPU. Dalam hal ini, Azure Databricks merekomendasikan pengaturan parameter konfigurasi tingkat kluster Apache Spark spark.task.resource.gpu.amount ke 0 sehingga semua transformasi Apache Spark DataFrame dan eksekusi Apache Spark UDF tidak menggunakan sumber daya GPU.

Manfaat konfigurasi ini adalah sebagai berikut:

  • Ini meningkatkan paralelisme pekerjaan Apache Spark karena jenis instans GPU biasanya memiliki lebih banyak inti CPU daripada perangkat GPU.
  • Jika kluster Apache Spark dibagikan dengan beberapa pengguna, konfigurasi ini mencegah pekerjaan Apache Spark bersaing untuk sumber daya GPU dengan menjalankan beban kerja Ray secara bersamaan.

Menonaktifkan transformers integrasi MLflow pelatih jika menggunakannya dalam tugas Ray

Integrasi transformers MLflow pelatih diaktifkan secara default dari dalam transformers pustaka. Jika Anda menggunakan pelatihan Ray untuk menyempurnakan transformers model, tugas Ray akan gagal karena masalah kredensial. Namun, masalah ini tidak berlaku jika Anda langsung menggunakan MLflow untuk pelatihan. Untuk menghindari masalah ini, Anda dapat mengatur DISABLE_MLFLOW_INTEGRATION variabel lingkungan ke 'TRUE' dari dalam konfigurasi kluster Azure Databricks saat memulai kluster Apache Spark Anda.

Kesalahan pemilihan fungsi jarak jauh Address Ray

Untuk menjalankan tugas Ray, Ray mengacarkan fungsi tugas. Jika anda menemukan pickling gagal, Anda harus mendiagnosis bagian mana dari kode Anda yang menyebabkan kegagalan. Penyebab umum kesalahan pengambilan adalah penanganan referensi eksternal, penutupan, dan referensi ke objek stateful. Salah satu kesalahan termampu untuk memverifikasi dan memperbaiki dengan cepat dapat diperbaiki dengan memindahkan pernyataan impor dalam deklarasi fungsi tugas.

Misalnya, datasets.load_dataset adalah fungsi yang banyak digunakan yang di-patch di sisi driver Azure Databricks Runtime, merender referensi unpickle-able. Untuk mengatasinya, Anda cukup menulis fungsi tugas sebagai berikut:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Nonaktifkan monitor memori Ray jika tugas Ray secara tak terduga terbunuh dengan kesalahan kehabisan memori (OOM)

Dalam Ray 2.9.3, monitor memori Ray memiliki beberapa masalah yang diketahui yang dapat menyebabkan tugas Ray dihentikan secara tidak sengaja tanpa sebab. Untuk mengatasi masalah ini, Anda dapat menonaktifkan monitor memori Ray dengan mengatur variabel RAY_memory_monitor_refresh_ms lingkungan ke 0 dalam konfigurasi kluster Azure Databricks saat memulai kluster Apache Spark Anda.

Menerapkan fungsi transformasi ke batch data

Saat memproses data dalam batch, disarankan untuk menggunakan RAY Data API dengan fungsi .map_batches Pendekatan ini bisa lebih efisien dan dapat diskalakan, terutama untuk himpunan data besar atau komputasi kompleks yang mendapat manfaat dari pemrosesan batch. Spark DataFrame apa pun dapat dikonversi ke Himpunan ray.data.from_spark Data Ray menggunakan API. Output yang diproses dari memanggil API transformasi ini dapat ditulis ke tabel Azure Databricks UC menggunakan API ray.data.write_databricks_table.

Menggunakan MLflow dalam tugas Ray

Untuk menggunakan MLflow dalam tugas Ray, Anda harus :

  • Tentukan kredensial Azure Databricks MLflow dalam tugas Ray.
  • Buat eksekusi MLflow dalam Apache Spark Driver dan teruskan yang dibuat run_id ke tugas Ray.

Contoh kode berikut menunjukkan cara melakukan ini:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

experiment_name = "/Users/<your-name> <Databricks>.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in <AS> driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in <AS> driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Menggunakan pustaka Python lingkup buku catatan atau pustaka Python kluster dalam tugas Ray

Saat ini, Ray memiliki masalah yang diketahui di mana tugas Ray tidak dapat menggunakan pustaka python lingkup buku catatan atau pustaka python kluster. Untuk menggunakan dependensi tambahan dalam pekerjaan Ray, Anda harus menginstal pustaka secara manual menggunakan %pip perintah magic sebelum meluncurkan kluster Ray-on-Spark yang akan menggunakan dependensi ini dalam tugas. Misalnya, untuk memperbarui versi Ray yang akan digunakan untuk memulai kluster Ray, Anda dapat menjalankan perintah berikut di notebook Anda:

%pip install ray==<The Ray version you want to use> --force-reinstall

Kemudian, jalankan perintah berikut di notebook Anda untuk memulai ulang kernel Python:

dbutils.library.restartPython()

Langkah berikutnya