Verwenden von Ray in Azure Databricks

Mit Ray 2.3.0 und höher können Sie Ray-Cluster erstellen und Ray-Anwendungen auf Apache Spark-Clustern mit Azure Databricks ausführen. Informationen zu den ersten Schritten mit maschinellem Lernen auf Ray, einschließlich Lernprogrammen und Beispielen, finden Sie in der Ray-Dokumentation. Weitere Informationen zur Ray- und Apache Spark-Integration finden Sie in der Ray on Spark API-Dokumentation.

Anforderungen

  • Databricks Runtime 12.2 LTS ML und höher
  • Der Zugriffsmodus des Databricks Runtime-Clusters muss entweder „Zugewiesen“ oder „Keine Isolierung gemeinsam genutzt“ sein.

Installieren von Ray

Verwenden Sie folgenden Befehl, um Ray zu installieren. Die [default]-Erweiterung wird von der Ray-Dashboardkomponente benötigt.

%pip install ray[default]>=2.3.0

Erstellen eines benutzerspezifischen Ray-Clusters in einem Databricks-Cluster

Verwenden Sie die API ray.util.spark.setup_ray_cluster, um einen Ray-Cluster zu erstellen.

In jedem Databricks-Notebook, das an einen Databricks-Cluster angefügt ist, können Sie den folgenden Befehl ausführen:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_worker_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

Die ray.util.spark.setup_ray_cluster-API erstellt einen Ray-Cluster auf Spark. Intern wird ein Spark-Hintergrundauftrag erstellt. Jede Spark-Aufgabe im Auftrag erstellt einen Ray Workerknoten, und der Ray-Hauptknoten wird auf dem Treiber erstellt. Das Argument num_worker_nodes stellt die Anzahl der zu erstellenden Ray-Workerknoten dar. Um die Anzahl der CPU- oder GPU-Kerne anzugeben, die jedem Ray-Workerknoten zugewiesen sind, legen Sie das Argument num_cpus_worker_node (Standardwert: 1) oder num_gpus_worker_node (Standardwert: 0) fest.

Nachdem ein Ray-Cluster erstellt wurde, können Sie jeden Ray-Anwendungscode direkt in Ihrem Notebook ausführen. Klicken Sie in einer neuen Registerkarte auf Ray-Cluster-Dashboard öffnen, um das Ray-Dashboard für den Cluster anzuzeigen.

Tipp

Wenn Sie einen Azure Databricks-Cluster für einen einzelnen Benutzer verwenden, können Sie num_worker_nodes auf ray.util.spark.MAX_NUM_WORKER_NODES festlegen, um alle verfügbaren Ressourcen für Ihren Ray-Cluster zu verwenden.

setup_ray_cluster(
  # ...
  num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)

Legen Sie das Argument collect_log_to_path fest, um den Zielpfad anzugeben, an dem Sie die Protokolle des Ray-Clusters sammeln möchten. Die Protokollsammlung wird ausgeführt, nachdem der Ray-Cluster heruntergefahren wurde. Databricks empfiehlt, einen Pfad beginnend mit /dbfs/ festzulegen, damit die Protokolle beibehalten werden, auch wenn Sie den Spark-Cluster beenden. Andernfalls können Ihre Protokolle nicht wiederhergestellt werden, da der lokale Speicher im Cluster gelöscht wird, wenn der Cluster heruntergefahren wird.

Hinweis

„Damit Ihre Ray-Anwendung automatisch den erstellten Ray-Cluster verwendet, rufen Sie ray.util.spark.setup_ray_cluster auf, um die RAY_ADDRESS-Umgebungsvariable auf die Adresse des Ray-Clusters festzulegen.“ Sie können eine alternative Clusteradresse mithilfe des Arguments address der ray.init-API angeben.

Ausführen einer Ray-Anwendung

Nachdem der Ray-Cluster erstellt wurde, können Sie einen beliebigen Ray-Anwendungscode in einem Azure Databricks-Notebook ausführen.

Wichtig

Databricks empfiehlt, alle erforderlichen Bibliotheken für Ihre Anwendung mit %pip install <your-library-dependency> zu installieren, um sicherzustellen, dass sie für Ihren Ray-Cluster und ihre Anwendung verfügbar sind. Die Angabe von Abhängigkeiten im Aufruf der Ray-Init-Funktion installiert die Abhängigkeiten an einem für die Spark-Workerknoten unzugänglichen Ort, was zu Versionsinkompatibilitäten und Importfehlern führt.

Sie können beispielsweise eine einfache Ray-Anwendung in einem Azure Databricks-Notebook wie folgt ausführen:

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

Erstellen eines Ray-Clusters im automatischen Skalierungsmodus

In Ray 2.8.0 und höher unterstützen Ray-Cluster, die auf Databricks gestartet wurden, die Integration in die automatische Skalierung von Databricks. Siehe Automatische Skalierung von Databricks-Clustern.

Mit Ray 2.8.0 und höher können Sie einen Ray-Cluster auf einem Databricks-Cluster erstellen, der die Skalierung nach oben oder unten je nach Arbeitsauslastung unterstützt. Diese automatische Skalierungsintegration löst die automatische Skalierung von Databricks-Clustern intern innerhalb der Databricks-Umgebung aus.

Führen Sie den folgenden Befehl aus, um die automatische Skalierung zu aktivieren:

from ray.util.spark import setup_ray_cluster

setup_ray_cluster(
  num_worker_nodes=8,
  autoscale=True,
  ... # other arguments
)

Wenn die automatische Skalierung aktiviert ist, gibt num_worker_nodes die maximale Anzahl von Ray-Workerknoten an. Die standardmäßige Mindestanzahl von Ray-Workerknoten ist null. Diese Standardeinstellung bedeutet, dass der Ray-Cluster auf null Ray-Workerknoten heruntergefahren wird, wenn er sich im Leerlauf befindet. Dies ist zwar nicht in allen Szenarien ideal für eine schnelle Reaktionsfähigkeit, kann aber, wenn es aktiviert ist, die Kosten erheblich senken.

Im automatischen Skalierungsmodus kann num_worker_nodes nicht auf ray.util.spark.MAX_NUM_WORKER_NODES festgelegt werden.

Die folgenden Argumente konfigurieren die Hoch- und Herunterskalierungsgeschwindigkeit:

  • autoscale_upscaling_speed steht für die Anzahl der Knoten, die sich in der Schwebe befinden dürfen, als ein Vielfaches der aktuellen Anzahl von Knoten. Je höher der Wert, desto aggressiver ist die Hochskalierung. Wenn dieser Wert z.B. auf 1,0 gesetzt ist, kann der Cluster jederzeit um maximal 100 % wachsen.
  • autoscale_idle_timeout_minutes steht für die Anzahl der Minuten, die vergehen müssen, bevor ein untätiger Workerknoten vom Autoscaler entfernt wird. Je kleiner der Wert, desto aggressiver ist das Herunterskalieren.

Mit Ray 2.9.0 und höher können Sie auch autoscale_min_worker_nodes festlegen, damit der Ray-Cluster nicht auf null Worker heruntergefahren wird, wenn der Ray-Cluster im Leerlauf ist.

Herstellen einer Verbindung mit dem Remote-Ray-Cluster mithilfe des Ray-Clients

Erstellen Sie in Ray 2.9.3 einen Ray-Cluster, indem Sie die setup_ray_cluster-API aufrufen. Rufen Sie im gleichen Notebook die ray.init()-API auf, um eine Verbindung mit diesem Ray-Cluster herzustellen.

Rufen Sie für einen Ray-Cluster, der sich nicht im globalen Modus befindet, mithilfe des folgenden Codes die Remoteverbindungszeichenfolge ab:

So rufen Sie die Remote-Verbindungszeichenfolge mithilfe der folgenden Aktionen ab:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Verbinden Sie den Remotecluster unter Verwendung dieser Remoteverbindungszeichenfolge:

import ray
ray.init(remote_conn_str)

Der Ray-Client unterstützt die im ray.data-Modul definierte Ray-Dataset-API nicht. Als Abhilfe können Sie Ihren Code, der die Ray-Datensatz-API aufruft, in eine Remote-Ray-Aufgabe einschließen, wie im folgenden Code gezeigt:

import ray
import pandas as pd
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())

Laden von Daten aus einem Spark-Datenrahmen

Um einen Spark DataFrame als Ray-Datensatz zu laden, müssen Sie den Spark DataFrame zunächst im Parquet-Format auf UC-Volumes oder im Databricks-Dateisystem (veraltet) speichern. Um den Zugriff auf Databricks Filesystem sicher zu steuern, empfiehlt Databricks, dass Sie Cloud-Objektspeicher in DBFS einbinden. Anschließend können Sie eine ray.data.Dataset-Instanz aus dem gespeicherten Spark DataFrame-Pfad mithilfe der folgenden Hilfsmethode erstellen:

import ray
import os
from urllib.parse import urlparse

def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
    spark_dataframe.write.mode('overwrite').parquet(dbfs_tmp_path)
    fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
    return ray.data.read_parquet(fuse_path)

# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")

# Provide a dbfs location to write the table to
data_location_2 = (
    "dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)

# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
    spark_dataframe=spark_df,
    dbfs_tmp_path=data_location_2
)

Laden von Daten aus einer Unity-Katalogtabelle über Databricks SQL Warehouse

Für Ray 2.8.0 und höher können Sie die ray.data.read_databricks_tables-API aufrufen, um Daten aus einer Databricks Unity-Katalogtabelle zu laden.

Zunächst müssen Sie die DATABRICKS_TOKEN-Umgebungsvariable auf Ihr Databricks Warehouse-Zugriffstoken festlegen. Wenn Sie Ihr Programm nicht in Databricks Runtime ausführen, legen Sie die DATABRICKS_HOST-Umgebungsvariable auf die URL des Databricks-Arbeitsbereichs fest, wie im Folgenden gezeigt:

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

Rufen Sie dann ray.data.read_databricks_tables() zum Lesen aus dem Databricks SQL Warehouse auf.

import ray

ray_dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

Konfigurieren von Ressourcen, die vom Ray-Hauptknoten verwendet werden

Standardmäßig beschränkt Databricks bei der Konfiguration Ray on Spark die dem Ray-Hauptknoten zugewiesenen Ressourcen auf:

  • 0 CPU-Kerne
  • 0 GPUs
  • 128 MB Heapspeicher
  • 128 MB Objektspeicher

Der Grund: Der Ray-Hauptknoten wird normalerweise für die globale Koordination und nicht für die Ausführung von Ray-Aufgaben verwendet. Die Spark-Treiberknotenressourcen werden für mehrere Benutzer freigegeben, sodass die Standardeinstellung Ressourcen auf der Spark-Treiberseite speichert.

Mit Ray 2.8.0 und höher können Sie Ressourcen konfigurieren, die vom Ray-Hauptknoten verwendet werden. Verwenden Sie die folgenden Argumente in der setup_ray_cluster-API:

  • num_cpus_head_node: Einstellung von CPU-Kernen, die vom Ray-Hauptknoten verwendet werden
  • num_gpus_head_node: Einstellung der GPU, die vom Ray-Hauptknoten verwendet wird
  • object_store_memory_head_node: Einstellung der Speichergröße des Objektspeichers durch den Ray-Hauptknoten

Unterstützung für heterogene Cluster

Für effizientere und kostengünstigere Trainingsläufe können Sie einen Ray on Spark-Cluster erstellen und unterschiedliche Konfigurationen zwischen dem Ray-Hauptknoten und den Ray-Workerknoten festlegen. Alle Ray-Workerknoten müssen jedoch dieselbe Konfiguration aufweisen. Databricks-Cluster unterstützen keine heterogenen Cluster, aber Sie können einen Databricks-Cluster mit unterschiedlichen Treiber- und Workerinstanztypen erstellen, indem Sie eine Cluster-Richtlinie festlegen.

Beispiel:

{
  "node_type_id": {
    "type": "fixed",
    "value": "i3.xlarge"
  },
  "driver_node_type_id": {
    "type": "fixed",
    "value": "g4dn.xlarge"
  },
  "spark_version": {
    "type": "fixed",
    "value": "13.x-snapshot-gpu-ml-scala2.12"
  }
}

Optimieren der Ray-Clusterkonfiguration

Die empfohlene Konfiguration für jeden Ray-Workerknoten lautet:

  • Mindestens 4 CPU-Kerne pro Ray-Workerknoten.
  • Mindestens 10 GB Heapspeicher für jeden Ray-Workerknoten.

Wenn Sie ray.util.spark.setup_ray_cluster aufrufen, empfiehlt Databricks, num_cpus_worker_node auf einen Wert >= 4 festzulegen.

Weitere Informationen zum Optimieren des Heap-Speichers für jeden Ray-Workerknoten finden Sie unter Speicherzuweisung für Ray-Workerknoten.

Speicherzuweisung für Ray-Workerknoten

Jeder Ray-Workerknoten verwendet zwei Speichertypen: Heap-Speicher und Objektspeicherspeicher. Die zugewiesene Arbeitsspeichergröße für jeden Typ wird wie unten beschrieben bestimmt.

Der Gesamtspeicher, der jedem Ray-Workerknoten zugeordnet ist:

RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES ist die maximale Anzahl von Ray-Workerknoten, die auf dem Spark-Workerknoten gestartet werden können. Dies wird durch das Argument num_cpus_worker_node oder num_gpus_worker_node bestimmt.

Wenn Sie das Argument object_store_memory_per_node nicht festlegen, sind die Heap-Speichergröße und die Objektspeicherspeichergröße, die jedem Ray-Workerknoten zugeordnet ist:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

Wenn Sie das Argument object_store_memory_per_node festlegen:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

Darüber hinaus ist die Speichergröße des Objektspeichers pro Ray-Workerknoten durch den gemeinsam genutzten Speicher des Betriebssystems begrenzt. Der Höchstwert ist 30:

OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY ist die für den Spark-Workerknoten konfigurierte /dev/shm Datenträgergröße.

Bewährte Methoden

Wie kann ich die CPU/GPU-Anzahl für die einzelnen Ray-Workerknoten festlegen?

Databricks empfiehlt, num_cpus_worker_node auf die Anzahl von CPU-Kernen pro Spark-Workerknoten und num_gpus_worker_node auf die Anzahl von GPUs pro Spark-Workerknoten festzulegen. In dieser Konfiguration startet jeder Spark-Workerknoten einen Ray-Workerknoten, der die Ressourcen des Spark-Workerknotens vollständig nutzt.

GPU-Clusterkonfiguration

Der Ray-Cluster wird auf einem Databricks Spark-Cluster ausgeführt. Ein gängiges Szenario ist die Verwendung eines Spark-Auftrags und einer Spark-UDF für einfache Datenvorverarbeitungsaufgaben, die keine GPU-Ressourcen benötigen, und die anschließende Verwendung von Ray für komplizierte Machine Learning-Aufgaben, die von GPUs profitieren. In diesem Fall empfiehlt Databricks das Festlegen des Konfigurationsparameters spark.task.resource.gpu.amount auf Spark-Clusterebene auf 0 sodass alle Spark DataFrame-Transformationen und Spark UDF-Ausführungen keine GPU-Ressourcen verwenden.

Die Vorteile dieser Konfiguration sind die folgenden:

  • Es erhöht die Spark-Auftrags-Parallelität, da der GPU-Instanztyp in der Regel viel mehr CPU-Kerne als GPU-Geräte aufweist.
  • Wenn der Spark-Cluster für mehrere Benutzer freigegeben ist, verhindert diese Konfiguration, dass Spark-Aufträge mit gleichzeitig ausgeführten Ray-Workloads konkurrieren.

Deaktivieren der transformers-Trainer-MLflow-Integration bei Verwendung in Ray-Aufgaben

Die transformers-Trainer-MLflow-Integration ist standardmäßig aktiviert. Wenn Sie Ray-Trainer zum Trainieren verwenden, ist die Ray-Aufgabe nicht erfolgreich, da die Databricks-MLflow-Dienstanmeldeinformationen nicht für Ray-Aufgaben konfiguriert sind.

Legen Sie die DISABLE_MLFLOW_INTEGRATION-Umgebungsvariable in der Databricks-Clusterkonfiguration auf „TRUE“ fest, um dieses Problem zu vermeiden. Ausführliche Informationen zur Anmeldung bei MLflow in Ihren Ray-Traineraufgaben finden Sie im Abschnitt „Verwenden von MLflow in Ray-Aufgaben“.

Behandeln von Pickling-Fehlern bei Ray-Remotefunktionen

Bei der Ausführung von Ray-Aufgaben wird Pickling verwendet, um die Aufgabenfunktion zu serialisieren. Ermitteln Sie im Falle eines Pickling-Fehlers die Zeilen in Ihrem Code, in denen der Fehler auftritt. Allgemeine Pickling-Fehler lassen sich häufig durch Verschieben von import-Befehlen in die Aufgabenfunktion behandeln. datasets.load_dataset ist beispielsweise eine weit verbreitete Funktion, die innerhalb von Databricks Runtime gepatcht wird, was möglicherweise das Pickling eines externen Imports verhindert. Um dieses Problem zu beheben, können Sie Ihren Code wie folgt aktualisieren:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Deaktivieren des Ray-Arbeitsspeichermonitors, wenn die Ray-Aufgabe unerwartet mit einem OOM-Fehler beendet wird

In Ray 2.9.3 gibt es bekannte Probleme für den Ray-Arbeitsspeichermonitor, die dazu führen, dass Ray-Aufgaben fälschlicherweise beendet werden.

Deaktivieren Sie zur Behandlung des Problems den Ray-Arbeitsspeichermonitor, indem Sie die RAY_memory_monitor_refresh_ms-Umgebungsvariable in der Databricks-Clusterkonfiguration auf 0 festlegen.

Konfiguration von Arbeitsspeicherressourcen für Spark- und Ray-Hybridworkloads

Wenn Sie Spark- und Ray-Hybridworkloads in einem Databricks-Cluster ausführen, empfiehlt Databricks, den Spark-Executorarbeitsspeicher in der Databricks-Clusterkonfiguration auf einen kleinen Wert zu verringern (beispielsweise spark.executor.memory 4g). Das liegt daran, dass der Spark-Executor innerhalb eines Java-Prozesses ausgeführt wird, der die Garbage Collection (GC) verzögert auslöst. Die Arbeitsspeicherauslastung für die Zwischenspeicherung von Spark-Datasets ist ziemlich hoch, was zu einer Verringerung des verfügbaren Arbeitsspeichers führt, den Ray verwenden kann. Um potenzielle OOM-Fehler zu vermeiden, empfiehlt Databricks, den für „spark.executor.memory“ konfigurierten Wert auf einen Wert zu verringern, der kleiner als der Standardwert ist.

Konfiguration von Computeressourcen für Spark- und Ray-Hybridworkloads

Wenn Sie Spark- und Ray-Hybridworkloads in einem Databricks-Cluster ausführen, konfigurieren Sie die Spark-Clusterknoten und/oder die Ray-Workerknoten als automatisch skalierbare Knoten.

Ein Beispiel: Wenn Sie eine feste Anzahl von Workerknoten in einem Databricks-Cluster haben, sollten Sie die automatische Skalierung von Ray on Spark aktivieren, sodass der Ray-Cluster herunterskaliert wird, wenn gerade keine Ray-Workload ausgeführt wird. Infolgedessen werden die im Leerlauf befindlichen Clusterressourcen freigegeben, damit sie von dem Spark-Auftrag verwendet werden können.

Wenn der Spark-Auftrag abgeschlossen und der Ray-Auftrag gestartet wird, wird der Ray on Spark-Cluster hochskaliert, um die Verarbeitungsanforderungen zu erfüllen.

Sie haben auch die Möglichkeit, sowohl den Databricks-Cluster als auch den Ray on Spark-Cluster automatisch skalierbar zu machen. Die automatisch skalierbaren Knoten des Databricks-Clusters können auf maximal zehn Knoten und die Ray on Spark-Workerknoten auf maximal vier Knoten (mit jeweils einem Ray-Workerknoten pro Spark-Worker) konfiguriert werden, sodass Spark bis zu sechs Knoten für Spark-Aufgaben zuordnen kann. Das bedeutet, dass Ray-Workloads maximal vier Knotenressourcen gleichzeitig verwenden können und der Spark-Auftrag Ressourcen in einer Größenordnung von maximal sechs Knoten zuordnen kann.

Anwenden der Transformationsfunktion auf Datenbatches

Bei der Batchverarbeitung von Daten empfiehlt Databricks die Verwendung der Ray-Daten-API mit der map_batches-Funktion. Dieser Ansatz kann effizienter und skalierbarer sein – insbesondere bei großen Datasets oder bei komplexen Berechnungen, die von der Batchverarbeitung profitieren. Jeder beliebige Spark DataFrame kann mithilfe der ray.data.from_spark-API in Ray-Daten konvertiert und mithilfe der ray.data.write_databricks_table-API in die Databricks-UC-Tabelle geschrieben werden.

Verwenden von MLflow in Ray-Aufgaben

Konfigurieren Sie Folgendes, um MLflow in Ray-Aufgaben zu verwenden:

  • Databricks MLflow-Anmeldeinformationen in Ray-Aufgaben
  • Spark-treiberseitige MLflow-Ausführungen, die die generierten run_id-Werte an Ray-Aufgaben übergeben

Im Folgenden finden Sie ein Codebeispiel:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

experiment_name = "/Users/<your-name>@databricks.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in Spark driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in Spark driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Verwenden von Notebook-bezogenen Python-Bibliotheken oder Python-Clusterbibliotheken in Ray-Aufgaben

Aufgrund eines bekannten Problems bei Ray können Ray-Aufgaben derzeit keine Notebook-bezogenen Python-Bibliotheken oder Python-Clusterbibliotheken verwenden. Führen Sie zur Behandlung dieser Einschränkung in Ihrem Notebook den folgenden Befehl aus, bevor Sie einen Ray on Spark-Cluster starten:

%pip install ray==<The Ray version you want to use> --force-reinstall

Führen Sie dann in Ihrem Notebook den folgenden Befehl aus, um den Python-Kernel neu zu starten:

dbutils.library.restartPython()

Aktivieren von Stapelüberwachungsverfolgungen und Flammendiagrammen auf der Ray Dashboard Akteure-Seite

Auf der Seite Ray Dashboard Actors können Sie Stapelüberwachungen und Flammendiagramme für aktive Ray-Akteure anzeigen.

Installieren Sie py-spy, bevor Sie den Ray-Cluster starten, um diese Informationen anzuzeigen:

%pip install py-spy

Herunterfahren eines Ray-Clusters

Rufen Sie zum Herunterfahren eines in Azure Databricks ausgeführten Ray-Clusters die ray.utils.spark.shutdown_ray_cluster-API auf.

Hinweis

Ray-Cluster werden auch heruntergefahren, wenn:

  • Sie Ihr interaktives Notebook von Ihrem Azure Databricks-Cluster trennen.
  • Ihr Azure Databricks-Auftrag abgeschlossen wird.
  • Ihr Azure Databricks-Cluster neu gestartet oder beendet wird.
  • Es gibt keine Aktivität für die angegebene Leerlaufzeit.

Notebook mit Beispielen

Das folgende Notebook veranschaulicht, wie Sie einen Ray-Cluster erstellen und eine Ray-Anwendung auf Databricks ausführen.

Ray auf Spark Starter-Notebook

Notebook abrufen

Begrenzungen

  • Für mehrere Benutzer freigegebene Azure Databricks-Cluster (Isolationsmodus aktiviert) werden nicht unterstützt.
  • Wenn Sie %pip zum Installieren von Paketen verwenden, wird der Ray-Cluster heruntergefahren. Stellen Sie sicher, dass Sie Ray starten, nachdem Sie die Installation aller Bibliotheken mit %pip abgeschlossen haben.
  • Die Verwendung von Integrationen, die die Konfiguration von ray.util.spark.setup_ray_cluster außer Kraft setzen, kann dazu führen, dass der Ray-Cluster instabil wird und der Ray-Kontext abstürzt. Beispielsweise kann die Verwendung des xgboost_ray-Pakets und das Festlegen von RayParams mit einem Akteur oder cpus_per_actor-Konfiguration über die Ray-Clusterkonfiguration im Hintergrund abstürzen.