Använda Ray på Azure Databricks

Med Ray 2.3.0 och senare kan du skapa Ray-kluster och köra Ray-program på Apache Spark-kluster med Azure Databricks. Information om hur du kommer igång med maskininlärning på Ray, inklusive självstudier och exempel finns i Ray-dokumentationen. Mer information om Ray- och Apache Spark-integreringen finns i dokumentationen om Ray på Spark API.

Krav

  • Databricks Runtime 12.2 LTS ML och senare.
  • Databricks Runtime-klusteråtkomstläget måste vara antingen läget "Tilldelad" eller "Ingen isolering delad".

Installera Ray

Använd följande kommando för att installera Ray. Tillägget [default] krävs av Ray-instrumentpanelskomponenten.

%pip install ray[default]>=2.3.0

Skapa ett användarspecifikt Ray-kluster i ett Databricks-kluster

Om du vill skapa ett Ray-kluster använder du api:et ray.util.spark.setup_ray_cluster .

I alla Databricks-notebook-filer som är anslutna till ett Databricks-kluster kan du köra följande kommando:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_worker_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

API:et ray.util.spark.setup_ray_cluster skapar ett Ray-kluster på Spark. Internt skapar det ett Spark-bakgrundsjobb. Varje Spark-uppgift i jobbet skapar en Ray-arbetsnod och Ray-huvudnoden skapas på drivrutinen. Argumentet num_worker_nodes representerar antalet Ray Worker-noder som ska skapas. Om du vill ange antalet PROCESSOR- eller GPU-kärnor som tilldelats varje Ray-arbetsnod anger du argumentet num_cpus_worker_node (standardvärde: 1) eller num_gpus_worker_node (standardvärde: 0).

När ett Ray-kluster har skapats kan du köra valfri Ray-programkod direkt i notebook-filen. Klicka på Öppna Instrumentpanel för Ray-kluster på en ny flik för att visa Ray-instrumentpanelen för klustret.

Dricks

Om du använder ett Azure Databricks-kluster för en enskild användare kan du ange num_worker_nodes att ray.util.spark.MAX_NUM_WORKER_NODES använda alla tillgängliga resurser för ditt Ray-kluster.

setup_ray_cluster(
  # ...
  num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)

Ange argumentet collect_log_to_path för att ange målsökvägen där du vill samla in Ray-klusterloggarna. Loggsamlingen körs när Ray-klustret har stängts av. Databricks rekommenderar att du anger en sökväg som börjar med /dbfs/ så att loggarna bevaras även om du avslutar Spark-klustret. Annars går det inte att återställa loggarna eftersom den lokala lagringen i klustret tas bort när klustret stängs av.

Kommentar

"Om du vill att ditt Ray-program automatiskt ska använda Ray-klustret som skapades anropar du ray.util.spark.setup_ray_cluster för att ange RAY_ADDRESS miljövariabeln till adressen för Ray-klustret." Du kan ange en alternativ klusteradress med argumentet address för api:et ray.init .

Köra ett Ray-program

När Ray-klustret har skapats kan du köra valfri Ray-programkod i en Azure Databricks-notebook-fil.

Viktigt!

Databricks rekommenderar att du installerar alla nödvändiga bibliotek för ditt program med för att säkerställa att de är tillgängliga för ditt Ray-kluster och -program i enlighet med %pip install <your-library-dependency> detta. Om du anger beroenden i ray init-funktionsanropet installeras beroendena på en plats som inte är tillgänglig för Spark-arbetsnoderna, vilket resulterar i versionskompatibiliteter och importfel.

Du kan till exempel köra ett enkelt Ray-program i en Azure Databricks-notebook-fil på följande sätt:

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

Skapa ett Ray-kluster i autoskalningsläge

I Ray 2.8.0 och senare har Ray-kluster som startats på Databricks stöd för integrering med Automatisk skalning i Databricks. Se Automatisk skalning av Databricks-kluster.

Med Ray 2.8.0 och senare kan du skapa ett Ray-kluster i ett Databricks-kluster som stöder upp- eller nedskalning enligt arbetsbelastningar. Den här integreringen av automatisk skalning utlöser automatisk skalning av Databricks-kluster internt i Databricks-miljön.

Om du vill aktivera automatisk skalning kör du följande kommando:

from ray.util.spark import setup_ray_cluster

setup_ray_cluster(
  num_worker_nodes=8,
  autoscale=True,
  ... # other arguments
)

Om autoskalning är aktiverat num_worker_nodes anger det maximala antalet Ray Worker-noder. Standardantalet för Ray Worker-noder är 0. Den här standardinställningen innebär att när Ray-klustret är inaktivt skalar det ned till noll Ray Worker-noder. Detta kanske inte är idealiskt för snabb svarstid i alla scenarier, men när det är aktiverat kan det avsevärt minska kostnaderna.

I autoskalningsläge num_worker_nodes går det inte att ange till ray.util.spark.MAX_NUM_WORKER_NODES.

Följande argument konfigurerar uppskalnings- och nedskalningshastigheten:

  • autoscale_upscaling_speed representerar antalet noder som tillåts vara väntande som en multipel av det aktuella antalet noder. Ju högre värde, desto mer aggressiv är uppskalningen. Om detta till exempel är inställt på 1,0 kan klustret växa i storlek med högst 100 % när som helst.
  • autoscale_idle_timeout_minutes representerar det antal minuter som måste passera innan en inaktiv arbetsnod tas bort av autoskalningsreglaget. Ju mindre värde, desto mer aggressiv är nedskalningen.

Med Ray 2.9.0 och senare kan du också ställa in autoscale_min_worker_nodes för att förhindra att Ray-klustret skalar ned till noll arbetare när Ray-klustret är inaktivt.

Anslut till ett fjärranslutet Ray-kluster med ray-klienten

I Ray 2.9.3 skapar du ett Ray-kluster genom att anropa API:et setup_ray_cluster . I samma notebook-fil anropar du API:et ray.init() för att ansluta till det här Ray-klustret.

För ett Ray-kluster som inte är i globalt läge hämtar du fjärr-anslutningssträng med följande kod:

Så här hämtar du fjärr-anslutningssträng med hjälp av följande:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Anslut till fjärrklustret med hjälp av den här fjärr-anslutningssträng:

import ray
ray.init(remote_conn_str)

Ray-klienten stöder inte ray-datauppsättnings-API:et som definierats i modulen ray.data . Som en lösning kan du omsluta koden som anropar API:et för Ray-datauppsättningen i en fjärr-Ray-uppgift, som du ser i följande kod:

import ray
import pandas as pd
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())

Läsa in data från en Spark DataFrame

Om du vill läsa in en Spark DataFrame som en Ray Dataset måste du först spara Spark DataFrame till UC-volymer eller Databricks Filesystem (inaktuellt) som Parquet-format. För att styra databricks-filsystemåtkomst på ett säkert sätt rekommenderar Databricks att du monterar molnobjektlagring till DBFS. Sedan kan du skapa en ray.data.Dataset instans från den sparade Spark DataFrame-sökvägen med hjälp av följande hjälpmetod:

import ray
import os
from urllib.parse import urlparse

def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
    spark_dataframe.write.mode('overwrite').parquet(dbfs_tmp_path)
    fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
    return ray.data.read_parquet(fuse_path)

# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")

# Provide a dbfs location to write the table to
data_location_2 = (
    "dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)

# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
    spark_dataframe=spark_df,
    dbfs_tmp_path=data_location_2
)

Läsa in data från en Unity Catalog-tabell via Databricks SQL Warehouse

För Ray 2.8.0 och senare kan du anropa API:et ray.data.read_databricks_tables för att läsa in data från en Databricks Unity Catalog-tabell.

Först måste du ange DATABRICKS_TOKEN miljövariabeln till din åtkomsttoken för Databricks-lagret. Om du inte kör programmet på Databricks Runtime anger DATABRICKS_HOST du miljövariabeln till URL:en för Databricks-arbetsytan enligt följande:

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

ray.data.read_databricks_tables() Anropa sedan för att läsa från Databricks SQL-lagret.

import ray

ray_dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

Konfigurera resurser som används av Ray-huvudnod

För Ray i Spark-konfigurationen begränsar Databricks som standard resurser som allokerats till Ray-huvudnoden till:

  • 0 CPU-kärnor
  • 0 GPU:er
  • 128 MB heapminne
  • Lagringsminne för 128 MB-objekt

Detta beror på att Ray-huvudnoden vanligtvis används för global samordning, inte för att köra Ray-uppgifter. Spark-drivrutinsnodresurser delas med flera användare, så standardinställningen sparar resurser på Spark-drivrutinssidan.

Med Ray 2.8.0 och senare kan du konfigurera resurser som används av Ray-huvudnoden. Använd följande argument i API:et setup_ray_cluster :

  • num_cpus_head_node: ange processorkärnor som används av Ray-huvudnoden
  • num_gpus_head_node: ställa in GPU som används av Ray-huvudnod
  • object_store_memory_head_node: ange minnesstorlek för objektlagring efter Ray-huvudnod

Stöd för heterogena kluster

För effektivare och kostnadseffektivare träningskörningar kan du skapa ett Ray på Spark-kluster och ange olika konfigurationer mellan Ray-huvudnoden och Ray-arbetsnoderna. Alla Ray-arbetsnoder måste dock ha samma konfiguration. Databricks-kluster stöder inte helt heterogena kluster, men du kan skapa ett Databricks-kluster med olika typer av drivrutins- och arbetsinstanser genom att ange en klusterprincip.

Till exempel:

{
  "node_type_id": {
    "type": "fixed",
    "value": "i3.xlarge"
  },
  "driver_node_type_id": {
    "type": "fixed",
    "value": "g4dn.xlarge"
  },
  "spark_version": {
    "type": "fixed",
    "value": "13.x-snapshot-gpu-ml-scala2.12"
  }
}

Justera Konfigurationen av Ray-klustret

Den rekommenderade konfigurationen för varje Ray-arbetsnod är:

  • Minst 4 CPU-kärnor per Ray-arbetsnod.
  • Minst 10 GB heapminne för varje Ray Worker-nod.

När du anropar ray.util.spark.setup_ray_clusterrekommenderar Databricks att du anger num_cpus_worker_node ett värde >= 4.

Mer information om hur du justerar heapminne för varje Ray-arbetsnod finns i Minnesallokering för Ray-arbetsnoder .

Minnesallokering för Ray-arbetsnoder

Varje Ray-arbetsnod använder två typer av minne: heapminne och objektlagringsminne. Den allokerade minnesstorleken för varje typ bestäms enligt beskrivningen nedan.

Det totala minne som allokeras till varje Ray-arbetsnod är:

RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES är det maximala antalet Ray Worker-noder som kan startas på Spark-arbetsnoden. Detta bestäms av argumentet num_cpus_worker_node eller num_gpus_worker_node.

Om du inte anger argumentet object_store_memory_per_nodeär minnesstorleken för heapminnet och objektarkivets minnesstorlek som allokerats till varje Ray-arbetsnod:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

Om du anger argumentet object_store_memory_per_node:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

Dessutom begränsas minnesstorleken för objektarkivet per Ray-arbetsnod av det delade minnet i operativsystemet. Det maximala värdet är:

OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY är diskstorleken /dev/shm som konfigurerats för Spark-arbetsnoden.

Bästa praxis

Så här anger du CPU/GPU-nummer för varje Ray Worker-nod?

Databricks rekommenderar att du anger num_cpus_worker_node antalet CPU-kärnor per Spark-arbetsnod och anger num_gpus_worker_node antalet GPU:er per Spark-arbetsnod. I den här konfigurationen startar varje Spark-arbetsnod en Ray Worker-nod som fullt ut använder resurserna för Spark-arbetsnoden.

Konfiguration av GPU-kluster

Ray-klustret körs ovanpå ett Databricks Spark-kluster. Ett vanligt scenario är att använda ett Spark-jobb och Spark UDF för att utföra enkla databearbetningsuppgifter som inte behöver GPU-resurser och sedan använda Ray för att utföra komplicerade maskininlärningsuppgifter som drar nytta av GPU:er. I det här fallet rekommenderar Databricks att du ställer in konfigurationsparametern spark.task.resource.gpu.amount på Spark-klusternivå till 0, så att alla Spark DataFrame-transformeringar och Spark UDF-körningar inte använder GPU-resurser.

Fördelarna med den här konfigurationen är följande:

  • Det ökar Spark-jobbparallelliteten eftersom GPU-instanstypen vanligtvis har många fler CPU-kärnor än GPU-enheter.
  • Om Spark-klustret delas med flera användare förhindrar den här konfigurationen att Spark-jobb konkurrerar om GPU-resurser med ray-arbetsbelastningar som körs samtidigt.

Inaktivera transformers mlflow-integrering för utbildare om du använder den i Ray-uppgifter

Tränings-MLflow-integreringen transformers är aktiverad som standard. Om du använder Ray Train för att träna den misslyckas Ray-uppgiften eftersom Databricks MLflow-tjänstens autentiseringsuppgifter inte har konfigurerats för Ray-uppgifter.

Undvik det här problemet genom att ange DISABLE_MLFLOW_INTEGRATION miljövariabeln till "TRUE" i databricks-klusterkonfigurationen. Mer information om hur du loggar in på MLflow i dina Ray-träningsuppgifter finns i avsnittet "Använda MLflow i Ray-uppgifter" för mer information.

Fel vid hämtning av fjärrfunktioner i Ray

För att köra Ray-uppgifter använder Ray pickle för att serialisera uppgiftsfunktionen. Om hämtningen misslyckas ska du fastställa raderna i koden där felet inträffar. Om du flyttar import kommandon till uppgiftsfunktionen åtgärdas ofta vanliga hämtningsfel. Till exempel datasets.load_dataset är en ofta använd funktion som råkar korrigeras i Databricks Runtime, vilket potentiellt gör en extern import obildad. Du kan åtgärda problemet genom att uppdatera koden så här:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Inaktivera Ray-minnesövervakaren om Ray-uppgiften oväntat avlivas med OOM-fel

I Ray 2.9.3 har Ray memory monitor kända problem som gör att Ray-uppgifter avlivas felaktigt.

Du kan åtgärda problemet genom att inaktivera Ray-minnesövervakaren genom att ange miljövariabeln RAY_memory_monitor_refresh_ms till 0 i konfigurationen för Databricks-klustret.

Minnesresurskonfiguration för Spark- och Ray-hybridarbetsbelastningar

Om du kör Spark- och Ray-hybridarbetsbelastningar i ett Databricks-kluster rekommenderar Databricks att du minskar Spark-körminnet till ett litet värde, till exempel inställningen spark.executor.memory 4g i konfigurationen för Databricks-klustret. Detta beror på att Spark-kören körs i en Java-process som utlöser skräpinsamling (GC) lazily. Minnestrycket för Cachelagring av Spark-datauppsättningar är ganska högt, vilket ger en minskning av det tillgängliga minne som Ray kan använda. För att undvika potentiella OOM-fel rekommenderar Databricks att du minskar det konfigurerade värdet "spark.executor.memory" till ett mindre värde än standardvärdet.

Beräkningsresurskonfiguration för Spark- och Ray-hybridarbetsbelastningar

Om du kör Hybrid Spark- och Ray-arbetsbelastningar i ett Databricks-kluster anger du antingen Spark-klusternoderna till autoskalbara, Ray Worker-noderna till autoskalbara eller båda med automatisk skalning aktiverat.

Om du till exempel har ett fast antal arbetsnoder i ett Databricks-kluster kan du överväga att aktivera automatisk skalning av Ray-on-Spark, så att Ray-klustret skalas ned när ingen Ray-arbetsbelastning körs. Därför släpps de inaktiva klusterresurserna så att Spark-jobbet kan använda dem.

När Spark-jobbet är klart och Ray-jobbet startar utlöser det Ray-on-Spark-klustret för att skala upp för att uppfylla bearbetningskraven.

Du kan också göra både Databricks-klustret och Ray-on-spark-klustret automatiskt skalbara. Mer specifikt kan du konfigurera autoskalbara noder i Databricks-klustret till högst 10 noder och Ray-on-Spark-arbetsnoderna till högst 4 noder (med en Ray Worker-nod per Spark-arbetare), vilket gör att Spark kan allokera upp till 6 noder för Spark-uppgifter. Det innebär att Ray-arbetsbelastningar kan använda högst 4 nodresurser samtidigt, medan Spark-jobbet kan allokera högst 6 noder med resurser.

Tillämpa transformeringsfunktionen på batchar med data

När du bearbetar data i batchar rekommenderar Databricks att du använder Ray Data-API:et med map_batches funktionen. Den här metoden kan vara mer effektiv och skalbar, särskilt för stora datamängder eller när du utför komplexa beräkningar som drar nytta av batchbearbetning. Alla Spark DataFrame-data kan konverteras till Ray-data med hjälp av API:et ray.data.from_spark och kan skrivas ut till databricks UC-tabellen med hjälp av API ray.data.write_databricks_table:et .

Använda MLflow i Ray-uppgifter

Konfigurera följande om du vill använda MLflow i Ray-uppgifter:

  • Databricks MLflow-autentiseringsuppgifter i Ray-uppgifter
  • MLflow körs på Spark-drivrutinssidan som skickar de genererade run_id värdena till Ray-uppgifter.

Följande kod är ett exempel:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

experiment_name = "/Users/<your-name>@databricks.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in Spark driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in Spark driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Använda python-bibliotek med notebook-omfattning eller python-klusterbibliotek i Ray-uppgifter

Ray har för närvarande ett känt problem med att Ray-uppgifter inte kan använda Python-bibliotek med notebook-omfattning eller python-klusterbibliotek. Du kan åtgärda den här begränsningen genom att köra följande kommando i anteckningsboken innan du startar ett Ray-on-Spark-kluster:

%pip install ray==<The Ray version you want to use> --force-reinstall

och kör sedan följande kommando i notebook-filen för att starta om Python-kerneln:

dbutils.library.restartPython()

Aktivera stackspårningar och flamdiagram på sidan Aktörer på Ray Dashboard

På sidan Aktörer på Ray Dashboard kan du visa stackspårningar och flamdiagram för aktiva Ray-aktörer.

Om du vill visa den här informationen installerar du py-spy innan du startar Ray-klustret:

%pip install py-spy

Stänga av ett Ray-kluster

Om du vill stänga av ett Ray-kluster som körs på Azure Databricks anropar du api:et ray.utils.spark.shutdown_ray_cluster .

Kommentar

Ray-kluster stängs också av när:

  • Du kopplar från den interaktiva notebook-filen från Azure Databricks-klustret.
  • Ditt Azure Databricks-jobb slutförs.
  • Ditt Azure Databricks-kluster startas om eller avslutas.
  • Det finns ingen aktivitet för den angivna inaktiva tiden.

Exempelnotebook-fil

Följande notebook-fil visar hur du skapar ett Ray-kluster och kör ett Ray-program på Databricks.

Ray på Spark Starter Notebook

Hämta notebook-fil

Begränsningar

  • Delade Azure Databricks-kluster med flera användare (isoleringsläge aktiverat) stöds inte.
  • När du använder %pip för att installera paket stängs Ray-klustret av. Se till att starta Ray när du har installerat alla bibliotek med %pip.
  • Om du använder integreringar som åsidosätter konfigurationen från ray.util.spark.setup_ray_cluster kan ray-klustret bli instabilt och krascha Ray-kontexten. Om du till exempel använder xgboost_ray paketet och inställningen RayParams med en aktör eller cpus_per_actor konfiguration som överstiger Ray-klusterkonfigurationen kan ray-klustret krascha i tysthet.