Freigeben über


Erstellen von Ray-Clustern in Azure Databricks und Herstellen einer Verbindung mit diesen Clustern

Hier erfahren Sie, wie Sie Ray-Computecluster in Azure Databricks erstellen, konfigurieren und ausführen.

Anforderungen

Um einen Ray-Cluster erstellen zu können, benötigen Sie Zugriff auf eine Databricks-Ressource mit folgenden Einstellungen:

  • Databricks Runtime 12.2 LTS ML und höher
  • Der Zugriffsmodus muss Einzelbenutzer oder Keine Isolation, freigegeben sein.

Hinweis

Ray-Cluster werden derzeit für serverloses Computing nicht unterstützt.

Installieren von Ray

Ab Databricks Runtime ML 15.0 ist Ray in Azure Databricks-Clustern vorinstalliert.

Für Runtimes, die vor 15.0 veröffentlicht wurden, kann pip verwendet werden, um Ray in Ihrem Cluster zu installieren:

%pip install ray[default]>=2.3.0

Erstellen eines benutzerspezifischen Ray-Clusters in einem Azure Databricks-Cluster

Verwenden Sie die API ray.util.spark.setup_ray_cluster, um einen Ray-Cluster zu erstellen.

Hinweis

Wenn Sie einen Ray-Cluster in einem Notebook erstellen, ist dieser nur für den aktuellen Notebook-Benutzer verfügbar. Der Ray-Cluster wird automatisch heruntergefahren, nachdem das Notebook vom Cluster getrennt wurde oder nachdem 30 Minuten lang keine Aktivität stattgefunden hat (keine Aufgaben an Ray gesendet). Wenn Sie einen Ray-Cluster erstellen möchten, der für alle Benutzer freigegeben und nicht an ein aktiv ausgeführtes Notebook gebunden ist, verwenden Sie stattdessen die ray.util.spark.setup_global_ray_cluster-API.

Ray-Cluster mit fester Größe

In jedem Azure Databricks-Notebook, das an einen Azure Databricks-Cluster angefügt ist, können Sie den folgenden Befehl ausführen, um einen Ray-Cluster mit fester Größe zu starten:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Ray-Cluster mit automatischer Skalierung

Informationen zum Starten eines Ray-Clusters mit automatischer Skalierung finden Sie unter Skalieren von Ray-Clustern in Azure Databricks.

Starten eines Ray-Clusters im globalen Modus

Ab Ray 2.9.0 können Sie einen Ray-Cluster im globalen Modus in einem Azure Databricks-Cluster erstellen. Ein Ray-Cluster im globalen Modus ermöglicht es allen Benutzern, die an den Azure Databricks-Cluster angefügt sind, auch den Ray-Cluster zu verwenden. Dieser Modus für die Ausführung eines Ray-Clusters verfügt nicht über die aktive Zeitlimitfunktion, die ein Einzelbenutzercluster bei der Ausführung einer Instanz eines Ray-Einzelbenutzerclusters hat.

Um einen globalen Ray-Cluster zu starten, an den mehrere Benutzer angefügt werden können und in dem sie Ray-Aufgaben ausführen können, erstellen Sie zunächst einen Azure Databricks-Notebookauftrag, und fügen Sie ihn an einen Azure Databricks-Cluster im freigegebenen Modus an. Führen Sie dann den folgenden Befehl aus:

from ray.util.spark import setup_global_ray_cluster

setup_global_ray_cluster(
  max_worker_nodes=2,
  ...
  # other arguments are the same as with the `setup_global_ray` API.
)

Dies ist ein blockierender Aufruf, der aktiv bleibt, bis Sie den Aufruf durch Klicken auf die Schaltfläche „Unterbrechen“ in der Notebook-Befehlszelle unterbrechen, das Notebook vom Azure Databricks-Cluster trennen oder den Azure Databricks-Cluster beenden. Andernfalls wird der Ray-Cluster im globalen Modus weiterhin ausgeführt und steht autorisierten Benutzern für die Übermittlung von Aufgaben zur Verfügung. Weitere Informationen zu Clustern im globalen Modus finden Sie in der Ray-API-Dokumentation.

Cluster im globalen Modus haben die folgenden Eigenschaften:

  • In einem Azure Databricks-Cluster können Sie jeweils nur einen aktiven Ray-Cluster im globalen Modus erstellen.
  • In einem Azure Databricks-Cluster kann der aktive Ray-Cluster im globalen Modus von allen Benutzern in allen angefügten Azure Databricks-Notebooks verwendet werden. Sie können ray.init() ausführen, um eine Verbindung zum aktiven Ray-Cluster im globalen Modus herzustellen. Da mehrere Benutzer auf diesen Ray-Cluster zugreifen können, kann es zu Ressourcenkonflikten kommen.
  • Der Ray-Cluster im globalen Modus bleibt aktiv, bis der setup_ray_cluster-Aufruf unterbrochen wird. Er verfügt nicht über ein automatisches Zeitlimit für das Herunterfahren wie Ray-Einzelbenutzercluster.

Erstellen eines Ray-GPU-Clusters

Bei GPU-Clustern können diese Ressourcen dem Ray-Cluster wie folgt hinzugefügt werden:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=4,
  num_cpus_per_node=8,
  num_gpus_per_node=1,
  num_cpus_head_node=8,
  num_gpus_head_node=1,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Herstellen einer Verbindung mit dem Remote-Ray-Cluster mithilfe des Ray-Clients

Ab der Ray-Version 2.3.0 können Sie einen Ray-Cluster über die setup_ray_cluster-API erstellen und im gleichen Notebook die ray.init()-API aufrufen, um eine Verbindung mit diesem Ray-Cluster herzustellen. Verwenden Sie Folgendes, um die Remoteverbindungszeichenfolge abzurufen:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Anschließend können Sie mithilfe der obigen Remoteverbindungszeichenfolge eine Verbindung für den Remotecluster herstellen:

import ray
ray.init(remote_conn_str)

Die im ray.data-Modul definierte Ray-Dataset-API wird vom Ray-Client nicht unterstützt. Als Abhilfe können Sie Ihren Code, der die Ray-Datensatz-API aufruft, in eine Remote-Ray-Aufgabe einschließen, wie im folgenden Code gezeigt:

import ray
import pandas as pd

# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI

For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:

``` shell
ray job submit  --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py

Die zu konfigurierenden Werte sind die URL des Azure Databricks-Arbeitsbereichs (beginnend mit https://) sowie die Werte nach /driver-proxy/o/ in der Ray-Dashboard-Proxy-URL, die nach dem Start des Ray-Clusters angezeigt wird.

Die Ray-Auftrags-CLI wird verwendet, um Aufträge aus externen Systemen an einen Ray-Cluster zu übermitteln. Sie wird jedoch nicht benötigt, um Aufträge für Ray-Cluster in Azure Databricks zu übermitteln. Es wird empfohlen, den Auftrag mithilfe von Azure Databricks-Workflows bereitzustellen, jeweils einen Ray-Cluster pro Anwendung zu erstellen und bereits vorhandene Azure Databricks-Tools wie Azure Databricks-Ressourcenpakete oder -Workflowtrigger zum Auslösen des Auftrags zu verwenden.

Festlegen eines Speicherorts für die Protokollausgabe

Mit dem Argument collect_log_to_path können Sie den Zielpfad angeben, in dem Sie die Protokolle des Ray-Clusters sammeln möchten. Die Protokollsammlung wird ausgeführt, nachdem der Ray-Cluster heruntergefahren wurde.

Azure Databricks empfiehlt das Festlegen eines Pfads, der mit /dbfs/ beginnt, oder eines Unity Catalog-Volumepfads, damit die Protokolle erhalten bleiben, auch wenn Sie den Apache Spark-Cluster beenden. Andernfalls können Ihre Protokolle nicht wiederhergestellt werden, da der lokale Speicher im Cluster gelöscht wird, wenn der Cluster heruntergefahren wird.

Nach Erstellung eines Ray-Clusters können Sie jeden beliebigen Ray-Anwendungscode direkt in Ihrem Notebook ausführen. Klicken Sie in einer neuen Registerkarte auf Ray-Cluster-Dashboard öffnen, um das Ray-Dashboard für den Cluster anzuzeigen.

Aktivieren von Stapelüberwachungsverfolgungen und Flammendiagrammen auf der Ray Dashboard Akteure-Seite

Auf der Seite Ray Dashboard Actors können Sie Stapelüberwachungen und Flammendiagramme für aktive Ray-Akteure anzeigen. Verwenden Sie zum Anzeigen dieser Informationen den folgenden Befehl, um „py-spy“ zu installieren, bevor Sie den Ray-Cluster starten:

%pip install py-spy

Bewährte Methoden zum Erstellen und Konfigurieren

In diesem Abschnitt werden bewährte Methoden zum Erstellen und Konfigurieren von Ray-Clustern behandelt.

GPU-fremde Workloads

Der Ray-Cluster wird auf Basis eines Azure Databricks-Spark-Clusters ausgeführt. Ein typisches Szenario besteht darin, einen Spark-Auftrag und eine Spark-UDF zu verwenden, um einfache Datenvorverarbeitungsaufgaben auszuführen, für die keine GPU-Ressourcen benötigt werden. Anschließend wird Ray verwendet, um komplizierte Machine Learning-Aufgaben auszuführen, die von GPUs profitieren. In diesem Fall empfiehlt Azure Databricks, den auf Spark-Clusterebene wirksamen Konfigurationsparameter „spark.task.resource.gpu.amount“ auf „0“ festzulegen, damit die Apache Spark-DataFrame-Transformationen und Apache Spark-UDF-Ausführungen keine GPU-Ressourcen verwenden.

Die Vorteile dieser Konfiguration sind die folgenden:

  • Dies erhöht die Parallelität von Apache Spark-Aufträgen, da der GPU-Instanztyp in der Regel über viel mehr CPU-Kerne verfügt als GPU-Geräte.
  • Wenn der Apache Spark-Cluster für mehrere Benutzer freigegeben ist, verhindert diese Konfiguration, dass Apache Spark-Aufträge mit gleichzeitig ausgeführten Ray-Workloads um GPU-Ressourcen konkurrieren.

Deaktivieren der transformers-Trainer-MLflow-Integration bei Verwendung in Ray-Aufgaben

Die transformers-Trainer-MLflow-Integration wird standardmäßig innerhalb der transformers-Bibliothek aktiviert. Wenn Sie Ray Train verwenden, um ein transformers-Modell zu optimieren, sind Ray-Aufgaben aufgrund eines Problems mit Anmeldeinformationen nicht erfolgreich. Dieses Problem tritt jedoch nicht auf, wenn Sie MLflow direkt zum Trainieren verwenden. Um dieses Problem zu vermeiden, können Sie die Umgebungsvariable DISABLE_MLFLOW_INTEGRATION innerhalb der Azure Databricks-Clusterkonfiguration auf „TRUE“ festlegen, wenn Sie Ihren Apache Spark-Cluster starten.

Behandeln von Pickling-Fehlern bei Ray-Remotefunktionen

Für die Ausführung von Ray-Aufgaben serialisiert Ray die Aufgabenfunktion im Pickle-Format. Sollte dieser Vorgang nicht erfolgreich sein, müssen Sie ermitteln, durch welchen Teil Ihres Codes der Fehler verursacht wird. Häufige Ursachen für Pickling-Fehler sind die Behandlung externer Verweise, Schließungen und Verweise auf zustandsbehaftete Objekte. Einer der am einfachsten zu überprüfenden und schnell zu korrigierenden Fehler lässt sich durch Verschieben von Importanweisungen innerhalb der Aufgabenfunktionsdeklaration behandeln.

Ein Beispiel: datasets.load_dataset ist eine weit verbreitete Funktion, die aufseiten des Azure Databricks Runtime-Treibers gepatcht wird, was dazu führt, dass für den Verweis kein Pickling mehr möglich ist. Zur Behandlung dieses Problems können Sie die Aufgabenfunktion einfach wie folgt schreiben:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Deaktivieren des Ray-Arbeitsspeichermonitors, wenn die Ray-Aufgabe unerwartet mit einem OOM-Fehler (Out Of Memory; nicht genügend Arbeitsspeicher) beendet wird

In Ray 2.9.3 gibt es mehrere bekannte Probleme für den Ray-Arbeitsspeichermonitor, die dazu führen können, dass Ray-Aufgaben irrtümlich und ohne Ursache beendet werden. Um dieses Problem zu behandeln, können Sie den Ray-Arbeitsspeichermonitor deaktivieren, indem Sie die Umgebungsvariable RAY_memory_monitor_refresh_ms innerhalb der Azure Databricks-Clusterkonfiguration auf 0 festlegen, wenn Sie Ihren Apache Spark-Cluster starten.

Anwenden von Transformationsfunktionen auf Datenbatches

Bei der Batchverarbeitung von Daten empfiehlt es sich, die Ray-Daten-API mit der map_batches-Funktion zu verwenden. Dieser Ansatz kann effizienter und skalierbarer sein – insbesondere bei großen Datasets oder bei komplexen Berechnungen, die von der Batchverarbeitung profitieren. Jeder Spark-DataFrame kann mithilfe der ray.data.from_spark-API in ein Ray-Dataset konvertiert werden. Die verarbeitete Ausgabe des Aufrufs dieser Transformations-API kann mithilfe der API ray.data.write_databricks_table in Azure Databricks-UC-Tabellen geschrieben werden.

Verwenden von MLflow in Ray-Aufgaben Um MLflow in Ray-Aufgaben zu verwenden, müssen folgende Schritte ausgeführt werden:

  • Definieren Sie Azure Databricks-MLflow-Anmeldeinformationen in Ray-Aufgaben.
  • Erstellen Sie MLflow-Ausführungen innerhalb des Apache Spark-Treibers, und übergeben Sie die erstellte Ausführungs-ID (run_id) an die Ray-Aufgaben.

Diese Vorgehensweise wird im folgenden Codebeispiel veranschaulicht:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars( <Databricks>")

experiment_name = "/Users/<your-name> <Databricks>.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in <AS> driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in <AS> driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Verwenden Notebook-bezogener Python-Bibliotheken oder Python-Clusterbibliotheken in Ray-Aufgaben

Aufgrund eines bekannten Problems bei Ray können Ray-Aufgaben derzeit keine Notebook-bezogenen Python-Bibliotheken oder Python-Clusterbibliotheken verwenden. Wenn Sie in Ihren Ray-Aufträgen zusätzliche Abhängigkeiten nutzen möchten, müssen Sie Bibliotheken mithilfe des Magic-Befehls %pip manuell installieren, bevor Sie einen Ray on Spark-Cluster starten, der diese Abhängigkeiten innerhalb von Aufgaben verwendet. Um beispielsweise die Version von Ray zu aktualisieren, die zum Starten des Ray-Clusters verwendet wird, können Sie in Ihrem Notebook den folgenden Befehl ausführen:

%pip install ray==<The Ray version you want to use> --force-reinstall

Führen Sie dann in Ihrem Notebook den folgenden Befehl aus, um den Python-Kernel neu zu starten:

dbutils.library.restartPython()

Nächste Schritte