Aracılığıyla paylaş


Azure Databricks'te Ray kümeleri oluşturma ve bu kümelere bağlanma

Azure Databricks'te Ray işlem kümeleri oluşturmayı, yapılandırmayı ve çalıştırmayı öğrenin

Gereksinimler

Bir Ray kümesi oluşturmak için aşağıdaki ayarlarla databricks çok amaçlı işlem kaynağına erişiminiz olmalıdır:

  • Databricks Runtime 12.2 LTS ML ve üzeri.
  • Erişim modu Tek kullanıcı veya Yalıtım paylaşılmamalıdır.

Not

Ray kümeleri şu anda sunucusuz işlemde desteklenmemektedir.

Ray'i yükleme

Databricks Runtime ML 15.0 ile Ray, Azure Databricks kümelerine önceden yüklenmiştir.

15.0'ten önce yayımlanan çalışma zamanları için, kümenize Ray'i yüklemek için pip kullanın:

%pip install ray[default]>=2.3.0

Azure Databricks kümesinde kullanıcıya özgü ray kümesi oluşturma

Ray kümesi oluşturmak için ray.util.spark.setup_ray_cluster API'sini kullanın.

Not

Bir not defterinde Ray kümesi oluşturduğunuzda, bu küme yalnızca geçerli not defteri kullanıcısı tarafından kullanılabilir. Ray kümesi, not defteri kümeden ayrıldıktan sonra veya 30 dakika etkinlik dışı kaldıktan sonra (Ray'e hiçbir görev gönderilmedi) otomatik olarak kapatılır. Tüm kullanıcılarla paylaşılan ve etkin olarak çalışan bir not defterine tabi olmayan bir Ray kümesi oluşturmak istiyorsanız, bunun yerine API'yi ray.util.spark.setup_global_ray_cluster kullanın.

Sabit boyutlu Ray kümesi

Azure Databricks kümesine bağlı herhangi bir Azure Databricks not defterinde, sabit boyutlu bir Ray kümesi başlatmak için aşağıdaki komutu çalıştırabilirsiniz:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Ray kümesini otomatik ölçeklendirme

Otomatik ölçeklendirme Ray kümesini başlatmayı öğrenmek için bkz . Azure Databricks'te Ray kümelerini ölçeklendirme.

Genel mod Ray kümesi başlatma

Ray 2.9.0 ve üzerini kullanarak Azure Databricks kümesinde genel mod ray kümesi oluşturabilirsiniz. Genel mod Ray kümesi, Azure Databricks kümesine bağlı tüm kullanıcıların Ray kümesini de kullanmasına olanak tanır. Bir Ray kümesini çalıştırmanın bu modu, tek kullanıcılı bir Ray kümesi örneği çalıştırırken tek kullanıcılı bir kümenin sahip olduğu etkin zaman aşımı işlevine sahip değildir.

Birden çok kullanıcının Ray görevlerine ekleyip çalıştırabileceği genel bir ray kümesi başlatmak için, bir Azure Databricks not defteri işi oluşturup bunu paylaşılan mod Azure Databricks kümesine ekleyerek başlayın ve aşağıdaki komutu çalıştırın:

from ray.util.spark import setup_global_ray_cluster

setup_global_ray_cluster(
  max_worker_nodes=2,
  ...
  # other arguments are the same as with the `setup_global_ray` API.
)

Bu, not defteri komut hücresinde "Kes" düğmesine tıklayarak, not defterini Azure Databricks kümesinden ayırarak veya Azure Databricks kümesini sonlandırarak aramayı kesene kadar etkin kalan bir engelleme çağrısıdır. Aksi takdirde, genel mod Ray kümesi çalışmaya devam eder ve yetkili kullanıcılar tarafından görev gönderimi için kullanılabilir. Genel mod kümeleri hakkında daha fazla bilgi için Ray API Belgeleri'ne bakın.

Genel mod kümeleri aşağıdaki özelliklere sahiptir:

  • Azure Databricks kümesinde aynı anda yalnızca bir etkin genel mod Ray kümesi oluşturabilirsiniz.
  • Azure Databricks kümesinde etkin genel mod Ray kümesi, ekli tüm Azure Databricks not defterlerindeki tüm kullanıcılar tarafından kullanılabilir. Etkin genel mod Ray kümesine bağlanmak için komutunu çalıştırabilirsiniz ray.init() . Bu Ray kümesine birden çok kullanıcı erişebildiğinden kaynak çekişmesi sorun olabilir.
  • Genel mod Ray kümesi, çağrı kesilene setup_ray_cluster kadar çalışır durumda olur. Tek kullanıcılı Ray kümelerinde olduğu gibi otomatik kapatma zaman aşımı yoktur.

Ray GPU kümesi oluşturma

GPU kümeleri için bu kaynaklar Ray kümesine aşağıdaki şekilde eklenebilir:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=4,
  num_cpus_per_node=8,
  num_gpus_per_node=1,
  num_cpus_head_node=8,
  num_gpus_head_node=1,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Ray istemcisini kullanarak uzak Ray kümesine bağlanma

Ray sürüm 2.3.0 ve üzeri sürümlerinde, setup_ray_cluster API'sini kullanarak bir Ray kümesi oluşturabilir ve aynı not defterinde ray.init() API'sini çağırarak bu Ray kümesine bağlanabilirsiniz. Uzak bağlantı dizesi almak için aşağıdakileri kullanın:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Ardından, yukarıdaki uzak bağlantı dizesi kullanarak uzak kümeyi bağlayabilirsiniz:

import ray
ray.init(remote_conn_str)

Ray istemcisi, ray.data modülünde tanımlanan Ray veri kümesi API'sini desteklemez. Geçici bir çözüm olarak, aşağıdaki kodda gösterildiği gibi Ray veri kümesi API'sini çağıran kodunuzu uzak bir Ray görevi içinde sarmalayabilirsiniz:

import ray
import pandas as pd

# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI

For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:

``` shell
ray job submit  --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py

Yapılandırılması gereken değerler, ile https://başlayan Azure Databricks çalışma alanı URL'leridir ve sonrasında bulunan /driver-proxy/o/ değerler, Ray kümesi başlatıldıktan sonra görüntülenen Ray Panosu proxy URL'sinde bulunur.

Işın İşi CLI'sı dış sistemlerden bir Ray kümesine iş göndermek için kullanılır, ancak Azure Databricks'te Işın kümelerinde iş göndermek için gerekli değildir. İşin Azure Databricks İş Akışları kullanılarak dağıtılması, uygulama başına bir Ray kümesi oluşturulması ve işi tetiklemede Azure Databricks Varlık Paketleri veya İş Akışı Tetikleyicileri gibi mevcut Azure Databricks araçlarının kullanılması önerilir.

Günlük çıkış konumu ayarlama

Ray kümesi günlüklerini toplamak istediğiniz hedef yolu belirtmek için bağımsız değişkenini collect_log_to_path ayarlayabilirsiniz. Ray kümesi kapatıldıktan sonra günlük koleksiyonu çalıştırılır.

Azure Databricks, Apache Spark kümesini sonlandırsanız bile günlükleri korumak için veya Unity Kataloğu Birim yolu ile /dbfs/ başlayan bir yol ayarlamanızı önerir. Aksi takdirde, küme kapatıldığında kümedeki yerel depolama alanı silindiğinden günlükleriniz kurtarılamaz.

Bir Ray kümesi oluşturduktan sonra, herhangi bir Ray uygulama kodunu doğrudan not defterinizde çalıştırabilirsiniz. Kümenin Ray panosunu görüntülemek için yeni bir sekmede Ray Kümesi Panosunu Aç'a tıklayın.

Ray Dashboard Actors sayfasında yığın izlemelerini ve alev grafiklerini etkinleştirme

Ray Dashboard Actors sayfasında, etkin Ray aktörleri için yığın izlemelerini ve alev grafiklerini görüntüleyebilirsiniz. Bu bilgileri görüntülemek için, Ray kümesini başlatmadan önce py-spy'ı yüklemek için aşağıdaki komutu kullanın:

%pip install py-spy

En iyi yöntemleri oluşturma ve yapılandırma

Bu bölüm, Ray kümelerini oluşturmaya ve yapılandırmaya yönelik en iyi yöntemleri kapsar.

GPU olmayan iş yükleri

Ray kümesi bir Azure Databricks Spark kümesinin üzerinde çalışır. Tipik bir senaryo, GPU kaynaklarına ihtiyaç duymayan basit veri ön işleme görevlerini gerçekleştirmek için Spark işi ve Spark UDF kullanmaktır. Ardından Gpu'lardan yararlanan karmaşık makine öğrenmesi görevlerini çalıştırmak için Ray'i kullanın. Bu durumda Azure Databricks, tüm Apache Spark DataFrame dönüştürmelerinin ve Apache Spark UDF yürütmelerinin GPU kaynaklarını kullanmaması için Spark küme düzeyi spark.task.resource.gpu.amount değerinin 0 olarak ayarlanmasını önerir.

Bu yapılandırmanın avantajları şunlardır:

  • GPU örnek türü genellikle GPU cihazlarından çok daha fazla CPU çekirdeğine sahip olduğundan Apache Spark işi paralelliğini artırır.
  • Apache Spark kümesi birden çok kullanıcıyla paylaşılıyorsa, bu yapılandırma Apache Spark işlerinin eşzamanlı olarak çalışan Ray iş yükleriyle GPU kaynakları için rekabet etmesini engeller.

Ray görevlerinde kullanıyorsanız eğitmen MLflow tümleştirmesini devre dışı bırakma transformers

Eğitmen transformers MLflow tümleştirmesi varsayılan olarak kitaplığın transformers içinden etkinleştirilir. Modelde ince ayar transformers yapmak için Ray train kullanırsanız, Kimlik bilgisi sorunu nedeniyle Ray görevleri başarısız olur. Ancak, eğitim için doğrudan MLflow kullanıyorsanız bu sorun geçerli değildir. Bu sorunu önlemek için, Apache Spark kümenizi başlatırken Azure Databricks kümesi yapılandırması içinden ortam değişkenini 'TRUE' olarak ayarlayabilirsiniz DISABLE_MLFLOW_INTEGRATION .

Ray uzaktan işlev pickling hatalarını giderme

Ray görevlerini çalıştırmak için, Ray görev işlevini seçer. Pickling işleminin başarısız olduğunu fark ederseniz kodunuzun hangi bölümünün hataya neden olduğunu tanılamanız gerekir. Pickling hatalarının yaygın nedenleri dış başvuruların, kapanışların ve durum bilgisi olan nesnelere yapılan başvuruların işlenmesidir. Doğrulanması ve hızla düzeltilmesi en kolay hatalardan biri, içeri aktarma deyimleri görev işlevi bildirimi içinde taşınarak düzeltilebilir.

Örneğin, datasets.load_dataset Azure Databricks Runtime sürücü tarafında düzeltme eki uygulanan ve başvuruyu düzgün olmayan şekilde işleyen yaygın olarak kullanılan bir işlevdir. Bunu ele almak için görev işlevini aşağıdaki gibi yazabilirsiniz:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Ray görevi bellek yetersiz (OOM) hatasıyla beklenmedik şekilde öldürülürse Ray bellek izleyicisini devre dışı bırakın

Ray 2.9.3'te Ray bellek monitörü, Işın görevlerinin nedensiz bir şekilde yanlışlıkla durdurulmasına neden olabilecek bilinen çeşitli sorunlara sahiptir. Sorunu gidermek için, Apache Spark kümenizi başlatırken ortam değişkenini RAY_memory_monitor_refresh_ms 0 Azure Databricks kümesi yapılandırması içinde olarak ayarlayarak Ray bellek izleyicisini devre dışı bırakabilirsiniz.

Veri toplu işlerine dönüştürme işlevleri uygulama

Verileri toplu olarak işlerken Ray Data API'sini işleviyle map_batches birlikte kullanmanız önerilir. Bu yaklaşım, özellikle toplu işlemeden yararlanan büyük veri kümeleri veya karmaşık hesaplamalar için daha verimli ve ölçeklenebilir olabilir. Herhangi bir Spark DataFrame, API kullanılarak bir Işın Veri Kümesine ray.data.from_spark dönüştürülebilir. Bu dönüştürme API'sinin çağrılmasından elde edilen işlenen çıkış, API ray.data.write_databricks_tablekullanılarak Azure Databricks UC tablolarına yazılabilir.

Ray görevlerinde MLflow kullanma Ray görevlerinde MLflow kullanmak için şunları yapmanız gerekir:

  • Ray görevleri içinde Azure Databricks MLflow kimlik bilgilerini tanımlayın.
  • MLflow oluşturma, Apache Spark Sürücüsü içinde çalışır ve oluşturulan run_id öğesini Ray görevlerine geçirir.

Aşağıdaki kod örneği bunun nasıl yapılacağını gösterir:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars( <Databricks>")

experiment_name = "/Users/<your-name> <Databricks>.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in <AS> driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in <AS> driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Ray görevlerinde not defteri kapsamlı Python kitaplıklarını veya küme Python kitaplıklarını kullanma

Şu anda Ray'in bilinen bir sorunu vardır ve Ray görevleri not defteri kapsamlı python kitaplıklarını veya küme python kitaplıklarını kullanamaz. Ray işlerinizdeki ek bağımlılıkları kullanmak için, görevler içinde bu bağımlılıkları kullanacak bir Ray-on-Spark kümesi başlatmadan önce magic komutunu kullanarak %pip kitaplıkları el ile yüklemeniz gerekir. Örneğin, Ray kümesini başlatmak için kullanılacak Ray sürümünü güncelleştirmek için not defterinizde aşağıdaki komutu çalıştırabilirsiniz:

%pip install ray==<The Ray version you want to use> --force-reinstall

Ardından, Python çekirdeğini yeniden başlatmak için not defterinizde aşağıdaki komutu çalıştırın:

dbutils.library.restartPython()

Sonraki adımlar