Delen via


Ray-clusters maken en er verbinding mee maken in Azure Databricks

Meer informatie over het maken, configureren en uitvoeren van Ray-rekenclusters in Azure Databricks

Vereisten

Als u een Ray-cluster wilt maken, moet u toegang hebben tot een databricks-rekenresource voor alle doeleinden met de volgende instellingen:

  • Databricks Runtime 12.2 LTS ML en hoger.
  • De toegangsmodus moet één gebruiker of geen isolatie gedeeld zijn.

Notitie

Ray-clusters worden momenteel niet ondersteund op serverloze berekeningen.

Ray installeren

Met Databricks Runtime ML 15.0 wordt Ray vooraf geïnstalleerd op Azure Databricks-clusters.

Voor runtimes die vóór 15.0 zijn uitgebracht, gebruikt u pip om Ray op uw cluster te installeren:

%pip install ray[default]>=2.3.0

Een gebruikersspecifiek Ray-cluster maken in een Azure Databricks-cluster

Gebruik de ray.util.spark.setup_ray_cluster-API om een Ray-cluster te maken.

Notitie

Wanneer u een Ray-cluster in een notebook maakt, is het alleen beschikbaar voor de huidige notebookgebruiker. Het Ray-cluster wordt automatisch afgesloten nadat het notebook is losgekoppeld van het cluster of na 30 minuten inactiviteit (er zijn geen taken naar Ray verzonden). Als u een Ray-cluster wilt maken dat wordt gedeeld met alle gebruikers en niet onderhevig is aan een actief actief notitieblok, gebruikt u in plaats daarvan de ray.util.spark.setup_global_ray_cluster API.

Ray-cluster met vaste grootte

In elk Azure Databricks-notebook dat is gekoppeld aan een Azure Databricks-cluster, kunt u de volgende opdracht uitvoeren om een Ray-cluster met een vaste grootte te starten:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Ray-cluster automatisch schalen

Zie Ray-clusters schalen in Azure Databricks voor meer informatie over het starten van een Ray-cluster voor automatisch schalen.

Een Ray-cluster in de globale modus starten

Met Ray 2.9.0 en hoger kunt u een Ray-cluster in de globale modus maken op een Azure Databricks-cluster. Met een Ray-cluster in de globale modus kunnen alle gebruikers die zijn gekoppeld aan het Azure Databricks-cluster ook het Ray-cluster gebruiken. Deze modus voor het uitvoeren van een Ray-cluster beschikt niet over de actieve time-outfunctionaliteit die een cluster met één gebruiker heeft bij het uitvoeren van een Ray-clusterexemplaren met één gebruiker.

Als u een globaal ray-cluster wilt starten waaraan meerdere gebruikers Ray-taken kunnen koppelen en uitvoeren, maakt u eerst een Azure Databricks-notebooktaak en koppelt u dit aan een Azure Databricks-cluster in de gedeelde modus en voert u vervolgens de volgende opdracht uit:

from ray.util.spark import setup_global_ray_cluster

setup_global_ray_cluster(
  max_worker_nodes=2,
  ...
  # other arguments are the same as with the `setup_global_ray` API.
)

Dit is een blokkerende aanroep die actief blijft totdat u de oproep onderbreekt door te klikken op de knop Onderbreken in de notebookopdrachtcel, het notebook los te koppelen van het Azure Databricks-cluster of het Azure Databricks-cluster te beëindigen. Anders blijft het Ray-cluster in de globale modus actief en is het beschikbaar voor het verzenden van taken door geautoriseerde gebruikers. Zie de documentatie van Ray API voor meer informatie over globale-modusclusters.

Globale modusclusters hebben de volgende eigenschappen:

  • In een Azure Databricks-cluster kunt u slechts één actief Ray-cluster in de globale modus tegelijk maken.
  • In een Azure Databricks-cluster kan het ray-cluster in de actieve globale modus worden gebruikt door alle gebruikers in een gekoppeld Azure Databricks-notebook. U kunt uitvoeren ray.init() om verbinding te maken met het actieve ray-cluster in de globale modus. Omdat meerdere gebruikers toegang hebben tot dit Ray-cluster, kan resourceconflict een probleem zijn.
  • Het Ray-cluster in de globale modus is totdat de setup_ray_cluster aanroep wordt onderbroken. Het heeft geen time-out voor automatisch afsluiten, zoals Ray-clusters met één gebruiker wel doen.

Een Ray GPU-cluster maken

Voor GPU-clusters kunnen deze resources op de volgende manier aan het Ray-cluster worden toegevoegd:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=4,
  num_cpus_per_node=8,
  num_gpus_per_node=1,
  num_cpus_head_node=8,
  num_gpus_head_node=1,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Verbinding maken met een extern Ray-cluster met behulp van de Ray-client

In Ray-versie 2.3.0 en hoger kunt u een Ray-cluster maken met behulp van de setup_ray_cluster-API. In hetzelfde notebook kunt u de RAY.init() API aanroepen om verbinding te maken met dit Ray-cluster. Gebruik het volgende om de externe verbindingsreeks op te halen:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Vervolgens kunt u het externe cluster verbinden met behulp van de bovenstaande externe verbindingsreeks:

import ray
ray.init(remote_conn_str)

De Ray-client biedt geen ondersteuning voor de Ray-gegevensset-API die is gedefinieerd in de ray.data-module. Als tijdelijke oplossing kunt u uw code verpakken die de Ray-gegevensset-API aanroept in een externe Ray-taak, zoals wordt weergegeven in de volgende code:

import ray
import pandas as pd

# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI

For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:

``` shell
ray job submit  --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py

De waarden die moeten worden geconfigureerd, zijn de URL van de Azure Databricks-werkruimte, beginnend met https://en vervolgens de waarden die worden gevonden nadat de /driver-proxy/o/ proxy-URL van raydashboard is gevonden nadat het Ray-cluster is gestart.

De Ray Job CLI wordt gebruikt voor het verzenden van taken naar een Ray-cluster vanuit externe systemen, maar is niet vereist voor het verzenden van taken op Ray-clusters in Azure Databricks. Het wordt aanbevolen om de taak te implementeren met behulp van Azure Databricks-taken, een Ray-cluster per toepassing te maken en bestaande Azure Databricks-hulpprogramma's, zoals Azure Databricks Asset Bundles of Werkstroomtriggers, te gebruiken om de taak te activeren.

Een locatie voor logboekuitvoer instellen

U kunt het argument collect_log_to_path instellen om het doelpad op te geven waar u de Ray-clusterlogboeken wilt verzamelen. Logboekverzameling wordt uitgevoerd nadat het Ray-cluster is afgesloten.

Azure Databricks raadt u aan om een pad in te stellen dat begint met /dbfs/ of het Unity Catalog-volumepad om de logboeken te behouden, zelfs als u het Apache Spark-cluster beëindigt. Anders kunnen uw logboeken niet worden hersteld omdat de lokale opslag op het cluster wordt verwijderd wanneer het cluster wordt afgesloten.

Nadat u een Ray-cluster hebt gemaakt, kunt u elke Ray-toepassingscode rechtstreeks in uw notebook uitvoeren. Klik op Ray Cluster Dashboard openen op een nieuw tabblad om het Ray-dashboard voor het cluster weer te geven.

Stacktraceringen en vlamgrafieken inschakelen op de pagina Ray Dashboard Actors

Op de pagina Ray Dashboard Actors kunt u stacktraceringen en vlamdiagrammen voor actieve Ray-acteurs bekijken. Als u deze informatie wilt bekijken, gebruikt u de volgende opdracht om py-spy te installeren voordat u het Ray-cluster start:

%pip install py-spy

Aanbevolen procedures maken en configureren

In deze sectie worden aanbevolen procedures beschreven voor het maken en configureren van Ray-clusters.

Niet-GPU-workloads

Het Ray-cluster wordt uitgevoerd op een Azure Databricks Spark-cluster. Een typisch scenario is het gebruik van een Spark-taak en Spark UDF voor het uitvoeren van eenvoudige taken voorverwerking van gegevens die geen GPU-resources nodig hebben. Gebruik vervolgens Ray om ingewikkelde machine learning-taken uit te voeren die profiteren van GPU's. In dit geval raadt Azure Databricks aan om de configuratieparameter spark.task.resource.gpu op Apache Spark-clusterniveau in te stellen op 0, zodat alle Apache Spark DataFrame-transformaties en Apache Spark UDF-uitvoeringen geen GPU-resources gebruiken.

De voordelen van deze configuratie zijn:

  • Het verhoogt parallelle uitvoering van Apache Spark-taken omdat het GPU-exemplaartype meestal veel meer CPU-kernen heeft dan GPU-apparaten.
  • Als het Apache Spark-cluster wordt gedeeld met meerdere gebruikers, voorkomt deze configuratie dat Apache Spark-taken concurreren voor GPU-resources met gelijktijdig actieve Ray-workloads.

Trainer MLflow-integratie uitschakelen transformers bij gebruik in Ray-taken

De transformers trainer MLflow-integratie is standaard ingeschakeld vanuit de transformers bibliotheek. Als u Ray Train gebruikt om een transformers model af te stemmen, mislukken Ray-taken vanwege een referentieprobleem. Dit probleem is echter niet van toepassing als u MLflow rechtstreeks gebruikt voor training. Om dit probleem te voorkomen, kunt u de DISABLE_MLFLOW_INTEGRATION omgevingsvariabele instellen op TRUE vanuit de Azure Databricks-clusterconfiguratie bij het starten van uw Apache Spark-cluster.

Fout bij het kiezen van externe functie van Ray oplossen

Om Ray-taken uit te voeren, kiest Ray de taakfunctie. Als het kiezen is mislukt, moet u vaststellen welk deel van uw code de fout veroorzaakt. Veelvoorkomende oorzaken van kiezerfouten zijn de verwerking van externe verwijzingen, sluitingen en verwijzingen naar stateful objecten. Een van de eenvoudigste fouten die u kunt controleren en snel corrigeren, kan worden opgelost door importinstructies binnen de declaratie van de taakfunctie te verplaatsen.

Is bijvoorbeeld datasets.load_dataset een veelgebruikte functie die wordt gepatcht in de stuurprogrammazijde van Azure Databricks Runtime, waardoor de verwijzing niet kan worden geselecteerd. U kunt de taakfunctie als volgt schrijven om dit te verhelpen:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Ray-geheugenmonitor uitschakelen als de Ray-taak onverwacht wordt gedood met een OOM-fout (out-of-memory)

In Ray 2.9.3 heeft de Ray-geheugenmonitor verschillende bekende problemen waardoor Ray-taken per ongeluk zonder oorzaak worden gestopt. U kunt het probleem oplossen door de Ray-geheugenmonitor uit te schakelen door de omgevingsvariabele RAY_memory_monitor_refresh_ms in 0 te stellen in de Azure Databricks-clusterconfiguratie bij het starten van uw Apache Spark-cluster.

Transformatiefuncties toepassen op batches gegevens

Wanneer u gegevens in batches verwerkt, is het raadzaam om de Ray Data-API met de map_batches functie te gebruiken. Deze benadering kan efficiënter en schaalbaarder zijn, met name voor grote gegevenssets of complexe berekeningen die profiteren van batchverwerking. Elk Spark DataFrame kan worden geconverteerd naar een Ray Dataset met behulp van de ray.data.from_spark API. De verwerkte uitvoer van het aanroepen van deze transformatie-API kan worden weggeschreven naar Azure Databricks UC-tabellen met behulp van de API ray.data.write_databricks_table.

MLflow gebruiken in Ray-taken

Als u MLflow in Ray-taken wilt gebruiken, moet u:

  • Definieer Azure Databricks MLflow-referenties binnen Ray-taken.
  • Maak MLflow-uitvoeringen in het Apache Spark-stuurprogramma en geef de gemaakte run_id taken door aan de Ray-taken.

In het volgende codevoorbeeld ziet u hoe u dit doet:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

experiment_name = "/Users/<your-name> <Databricks>.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in <AS> driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in <AS> driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Python-bibliotheken met notebookbereik of Python-clusterbibliotheken gebruiken in Ray-taken

Op dit moment heeft Ray een bekend probleem waarbij Ray-taken geen python-bibliotheken met notebookbereik of python-clusterbibliotheken kunnen gebruiken. Als u aanvullende afhankelijkheden binnen uw Ray-taken wilt gebruiken, moet u bibliotheken handmatig installeren met behulp van de %pip magic-opdracht voordat u een Ray-on-Spark-cluster start dat deze afhankelijkheden binnen taken gebruikt. Als u bijvoorbeeld de versie van Ray wilt bijwerken die wordt gebruikt om het Ray-cluster te starten, kunt u de volgende opdracht uitvoeren in uw notebook:

%pip install ray==<The Ray version you want to use> --force-reinstall

Voer vervolgens de volgende opdracht uit in uw notebook om de Python-kernel opnieuw op te starten:

dbutils.library.restartPython()

Volgende stappen