Dela via


Skapa och ansluta till Ray-kluster i Azure Databricks

Lär dig hur du skapar, konfigurerar och kör Ray-beräkningskluster i Azure Databricks

Krav

Om du vill skapa ett Ray-kluster måste du ha åtkomst till en databricks-beräkningsresurs för alla ändamål med följande inställningar:

  • Databricks Runtime 12.2 LTS ML och senare.
  • Åtkomstläget måste vara antingen Enskild användare eller Ingen isolering delas.

Kommentar

Ray-kluster stöds för närvarande inte på serverlös beräkning.

Installera Ray

Med Databricks Runtime ML 15.0 och senare är Ray förinstallerad i Azure Databricks-kluster.

För körningar som släppts före 15.0 använder du pip för att installera Ray i klustret:

%pip install ray[default]>=2.3.0

Skapa ett användarspecifikt Ray-kluster i ett Azure Databricks-kluster

Om du vill skapa ett Ray-kluster använder du api:et ray.util.spark.setup_ray_cluster .

Kommentar

När du skapar ett Ray-kluster i en notebook-fil är det bara tillgängligt för den aktuella notebook-användaren. Ray-klustret stängs automatiskt av efter att notebook-filen har kopplats från klustret eller efter 30 minuters inaktivitet (inga uppgifter har skickats till Ray). Om du vill skapa ett Ray-kluster som delas med alla användare och inte omfattas av en notebook-fil som körs aktivt använder du API:et ray.util.spark.setup_global_ray_cluster i stället.

Ray-kluster med fast storlek

I alla Azure Databricks-notebook-filer som är anslutna till ett Azure Databricks-kluster kan du köra följande kommando för att starta ett Ray-kluster med fast storlek:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Automatisk skalning av Ray-kluster

Information om hur du startar ett Ray-kluster med automatisk skalning finns i Skala Ray-kluster på Azure Databricks.

Starta ett Ray-kluster i globalt läge

Med Ray 2.9.0 och senare kan du skapa ett Ray-kluster i globalt läge i ett Azure Databricks-kluster. Med ett ray-kluster i globalt läge kan alla användare som är anslutna till Azure Databricks-klustret även använda Ray-klustret. Det här läget för att köra ett Ray-kluster har inte den aktiva timeout-funktionen som ett enanvändarkluster har när du kör en Ray-klusterinstans med en användare.

Starta ett globalt ray-kluster som flera användare kan koppla till och köra Ray-uppgifter på genom att skapa ett Azure Databricks Notebook-jobb och koppla det till ett Azure Databricks-kluster i delat läge och kör sedan följande kommando:

from ray.util.spark import setup_global_ray_cluster

setup_global_ray_cluster(
  max_worker_nodes=2,
  ...
  # other arguments are the same as with the `setup_global_ray` API.
)

Det här är ett blockerande anrop som förblir aktivt tills du avbryter anropet genom att klicka på knappen Avbryt i notebook-kommandocellen, koppla från anteckningsboken från Azure Databricks-klustret eller avsluta Azure Databricks-klustret. Annars fortsätter Ray-klustret i globalt läge att köras och vara tillgängligt för uppgiftsöverföring av behöriga användare. Mer information om globala lägeskluster finns i Dokumentation om Ray API.

Globala lägeskluster har följande egenskaper:

  • I ett Azure Databricks-kluster kan du bara skapa ett ray-kluster i aktivt globalt läge i taget.
  • I ett Azure Databricks-kluster kan ray-klustret i aktivt globalt läge användas av alla användare i valfri bifogad Azure Databricks-notebook-fil. Du kan köra ray.init() för att ansluta till ray-klustret i aktivt globalt läge. Eftersom flera användare har åtkomst till det här Ray-klustret kan det vara ett problem med resurskonkurration.
  • Ray-klustret i globalt läge är igång tills anropet setup_ray_cluster avbryts. Den har ingen tidsgräns för automatisk avstängning som Ray-kluster med en användare gör.

Skapa ett Ray GPU-kluster

För GPU-kluster kan dessa resurser läggas till i Ray-klustret på följande sätt:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=4,
  num_cpus_per_node=8,
  num_gpus_per_node=1,
  num_cpus_head_node=8,
  num_gpus_head_node=1,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Ansluta till ett fjärranslutet Ray-kluster med ray-klienten

I Ray version 2.3.0 och senare kan du skapa ett Ray-kluster med hjälp av setup_ray_cluster-API:et, och i samma notebook-fil kan du anropa ray.init()-API:et för att ansluta till ray-klustret. Använd följande för att hämta fjärr-anslutningssträng:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Sedan kan du ansluta fjärrklustret med hjälp av ovanstående fjärranslutna anslutningssträng:

import ray
ray.init(remote_conn_str)

Ray-klienten stöder inte ray-datauppsättnings-API:et som definierats i modulen ray.data. Som en lösning kan du omsluta koden som anropar API:et för Ray-datauppsättningen i en fjärr-Ray-uppgift, som du ser i följande kod:

import ray
import pandas as pd

# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI

For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:

``` shell
ray job submit  --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py

De värden som måste konfigureras är URL:en för Azure Databricks-arbetsytan, som börjar med https://och sedan värdena som hittas efter /driver-proxy/o/ hittas i proxy-URL:en för Ray-instrumentpanelen som visas när Ray-klustret har startats.

Ray Job CLI används för att skicka jobb till ett Ray-kluster från externa system, men krävs inte för att skicka jobb på Ray-kluster i Azure Databricks. Vi rekommenderar att jobbet distribueras med Hjälp av Azure Databricks-jobb, att ett Ray-kluster per program skapas och att befintliga Azure Databricks-verktyg, till exempel Azure Databricks Asset Bundles eller Arbetsflödesutlösare, används för att utlösa jobbet.

Ange en loggutdataplats

Du kan ange argumentet collect_log_to_path för att ange målsökvägen där du vill samla in Ray-klusterloggarna. Loggsamlingen körs när Ray-klustret har stängts av.

Azure Databricks rekommenderar att du anger en sökväg som börjar med /dbfs/ eller Unity Catalog Volume-sökvägen för att bevara loggarna även om du avslutar Apache Spark-klustret. Annars går det inte att återställa loggarna eftersom den lokala lagringen i klustret tas bort när klustret stängs av.

När du har skapat ett Ray-kluster kan du köra valfri Ray-programkod direkt i notebook-filen. Klicka på Öppna Instrumentpanel för Ray-kluster på en ny flik för att visa Ray-instrumentpanelen för klustret.

Aktivera stackspårningar och flamdiagram på sidan Aktörer på Ray Dashboard

På sidan Aktörer på Ray Dashboard kan du visa stackspårningar och flamdiagram för aktiva Ray-aktörer. Om du vill visa den här informationen använder du följande kommando för att installera py-spy innan du startar Ray-klustret:

%pip install py-spy

Skapa och konfigurera metodtips

Det här avsnittet beskriver metodtips för att skapa och konfigurera Ray-kluster.

Icke-GPU-arbetsbelastningar

Ray-klustret körs ovanpå ett Azure Databricks Spark-kluster. Ett vanligt scenario är att använda ett Spark-jobb och Spark UDF för att utföra enkla databearbetningsuppgifter som inte behöver GPU-resurser. Använd sedan Ray för att köra komplicerade maskininlärningsuppgifter som drar nytta av GPU:er. I det här fallet rekommenderar Azure Databricks att du anger konfigurationsparametern spark.task.resource.gpu.amount för Apache Spark-klusternivå till 0 så att alla Apache Spark DataFrame-transformeringar och Apache Spark UDF-körningar inte använder GPU-resurser.

Fördelarna med den här konfigurationen är följande:

  • Det ökar Apache Spark-jobbparallelliteten eftersom GPU-instanstypen vanligtvis har många fler CPU-kärnor än GPU-enheter.
  • Om Apache Spark-klustret delas med flera användare förhindrar den här konfigurationen att Apache Spark-jobb konkurrerar om GPU-resurser med ray-arbetsbelastningar som körs samtidigt.

Inaktivera transformers MLflow-integrering för utbildare om du använder den i Ray-uppgifter

MLflow-integreringen transformers för utbildaren är aktiverad som standard inifrån transformers biblioteket. Om du använder Ray Train för att finjustera en transformers modell misslyckas Ray-uppgifter på grund av ett problem med autentiseringsuppgifterna. Det här problemet gäller dock inte om du direkt använder MLflow för träning. För att undvika det här problemet kan du ange DISABLE_MLFLOW_INTEGRATION miljövariabeln till "TRUE" inifrån Azure Databricks-klusterkonfigurationen när du startar Apache Spark-klustret.

Fel vid hämtning av fjärrfunktioner i Ray

För att köra Ray-uppgifter, pickles Ray uppgiftsfunktionen. Om du upptäcker att hämtningen misslyckades måste du diagnostisera vilken del av koden som orsakar felet. Vanliga orsaker till pickling-fel är hanteringen av externa referenser, stängningar och referenser till tillståndskänsliga objekt. Ett av de enklaste felen för att verifiera och snabbt korrigera kan åtgärdas genom att flytta importinstruktioner i aktivitetsfunktionsdeklarationen.

Är till exempel datasets.load_dataset en allmänt använd funktion som korrigeras på Azure Databricks Runtime-drivrutinssidan, vilket gör referensen obildad. För att åtgärda det kan du helt enkelt skriva uppgiftsfunktionen på följande sätt:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Inaktivera Ray-minnesövervakaren om Ray-uppgiften oväntat avlivas med ett OOM-fel (out-of-memory)

I Ray 2.9.3 har rayminnesövervakaren flera kända problem som kan orsaka att Ray-uppgifter oavsiktligt stoppas utan orsak. Du kan åtgärda problemet genom att inaktivera Ray-minnesövervakaren genom att ange miljövariabeln RAY_memory_monitor_refresh_ms till 0 i Azure Databricks-klusterkonfigurationen när du startar Apache Spark-klustret.

Tillämpa transformeringsfunktioner på batchar med data

När du bearbetar data i batchar rekommenderar vi att du använder Ray Data-API:et map_batches med funktionen . Den här metoden kan vara mer effektiv och skalbar, särskilt för stora datamängder eller komplexa beräkningar som drar nytta av batchbearbetning. Alla Spark-dataramar kan konverteras till en Ray Dataset med hjälp av API:et ray.data.from_spark . Bearbetade utdata från att anropa det här transformerings-API:et kan skrivas ut till Azure Databricks UC-tabeller med hjälp av API ray.data.write_databricks_table:et .

Använda MLflow i Ray-uppgifter

Om du vill använda MLflow i Ray-uppgifter måste du :

  • Definiera autentiseringsuppgifter för Azure Databricks MLflow i Ray-uppgifter.
  • Skapa MLflow-körningar i Apache Spark-drivrutinen och skicka de skapade run_id till Ray-uppgifterna.

Följande kodexempel visar hur du gör detta:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

experiment_name = "/Users/<your-name> <Databricks>.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in <AS> driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in <AS> driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Använda Python-bibliotek med notebook-omfattning eller python-klusterbibliotek i Ray-uppgifter

För närvarande har Ray ett känt problem där Ray-uppgifter inte kan använda python-bibliotek med notebook-omfattning eller python-klusterbibliotek. Om du vill använda ytterligare beroenden i dina Ray-jobb måste du installera bibliotek manuellt med hjälp av det %pip magiska kommandot innan du startar ett Ray-on-Spark-kluster som använder dessa beroenden i aktiviteter. Om du till exempel vill uppdatera den version av Ray som ska användas för att starta Ray-klustret kan du köra följande kommando i notebook-filen:

%pip install ray==<The Ray version you want to use> --force-reinstall

Kör sedan följande kommando i anteckningsboken för att starta om Python-kerneln:

dbutils.library.restartPython()

Nästa steg