Compartir vía


Creación y conexión a clústeres de Ray en Azure Databricks

Aprenda a crear, configurar y ejecutar clústeres de proceso de Ray en Azure Databricks

Requisitos

Para crear un clúster de Ray, debe tener acceso a un recurso de proceso multiuso de Databricks con la siguiente configuración:

  • Databricks Runtime 12.2 LTS ML y versiones posteriores.
  • El modo de acceso debe ser o de Usuario único o Sin aislamiento compartido.

Nota:

Los clústeres de Ray no se admiten actualmente en el proceso sin servidor.

Instalación de Ray

Con Databricks Runtime ML 15.0 en adelante, Ray está preinstalado en clústeres de Azure Databricks.

En el caso de los runetimes publicados antes de la versión 15.0, use pip para instalar Ray en el clúster:

%pip install ray[default]>=2.3.0

Creación de un clúster de Ray específico de usuario en un clúster de Azure Databricks

Para crear un clúster de Ray, use la API de ray.util.spark.setup_ray_cluster.

Nota:

Al crear un clúster de Ray en un cuaderno, solo está disponible para el usuario actual del cuaderno. El clúster de Ray se apaga automáticamente después de que el cuaderno se desasocie del clúster o después de 30 minutos de inactividad (no se han enviado tareas a Ray). Si desea crear un clúster de Ray que se comparte con todos los usuarios y no está sujeto a un cuaderno en ejecución activa, use la API ray.util.spark.setup_global_ray_cluster en su lugar.

Clúster de Ray de tamaño fijo

En cualquier cuaderno de Azure Databricks que esté asociado a un clúster de Azure Databricks, puede ejecutar el siguiente comando para iniciar un clúster de Ray de tamaño fijo:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Escalado automático del clúster de Ray

Para obtener información sobre cómo iniciar un clúster de Ray de escalado automático, consulte Clústeres de Ray de escalado en Azure Databricks.

Inicio de un clúster de Ray en modo global

Con Ray 2.9.0 y versiones posteriores, puede crear un clúster de Ray en modo global en un clúster de Azure Databricks. Un clúster de Ray en modo global permite que todos los usuarios conectados al clúster de Azure Databricks también usen el clúster de Ray. Este modo de ejecución de un clúster de Ray no tiene la funcionalidad de tiempo de espera activa que un clúster de usuario único tiene al ejecutar una instancia de clúster de Ray de usuario único.

Para iniciar un clúster de rayos global en el que varios usuarios pueden asociar y ejecutar tareas de Ray, empiece por crear un trabajo de cuaderno de Azure Databricks y adjuntarlo a un clúster de Azure Databricks en modo compartido y, a continuación, ejecute el siguiente comando:

from ray.util.spark import setup_global_ray_cluster

setup_global_ray_cluster(
  max_worker_nodes=2,
  ...
  # other arguments are the same as with the `setup_global_ray` API.
)

Se trata de una llamada de bloqueo que permanecerá activa hasta que interrumpa la llamada haciendo clic en el botón "Interrumpir" de la celda de comandos del cuaderno, desasociando el cuaderno del clúster de Azure Databricks o finalizando el clúster de Azure Databricks. De lo contrario, el clúster de Ray en modo global seguirá ejecutándose y estará disponible para el envío de tareas por parte de los usuarios autorizados. Para más información sobre los clústeres de modo global, consulte documentación de Ray API.

Los clústeres de modo global tienen las siguientes propiedades:

  • En un clúster de Azure Databricks, solo puede crear un clúster de Ray en modo global activo a la vez.
  • En un clúster de Azure Databricks, todos los usuarios pueden usar el clúster de Ray en modo global activo en cualquier cuaderno de Azure Databricks conectado. Puede ejecutar ray.init() para conectarse al clúster de Ray en modo global activo. Dado que varios usuarios pueden acceder a este clúster de Ray, la contención de recursos puede ser un problema.
  • El clúster de Ray en modo global está activo hasta que se interrumpe la setup_ray_cluster llamada. No tiene un tiempo de espera de apagado automático como lo hacen los clústeres de Ray de un solo usuario.

Creación de un clúster de GPU de Ray

En el caso de los clústeres de GPU, estos recursos se pueden agregar al clúster de Ray de la siguiente manera:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=4,
  num_cpus_per_node=8,
  num_gpus_per_node=1,
  num_cpus_head_node=8,
  num_gpus_head_node=1,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Conexión al clúster remoto de Ray mediante el cliente de Ray

En la versión de Ray 2.3.0 y posteriores, puede crear un clúster de Ray mediante la API de setup_ray_cluster y, en el mismo cuaderno, puede llamar a la API de ray.init() para conectarse a este clúster de Ray. Para obtener la cadena de conexión remota, use lo siguiente:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

A continuación, puede conectar el clúster remoto mediante la cadena de conexión remota anterior:

import ray
ray.init(remote_conn_str)

El cliente de Ray no admite la API del conjunto de datos de Ray definida en el módulo ray.data. Como solución alternativa, puede encapsular el código que llama a la API del conjunto de datos de Ray dentro de una tarea de Ray remota, como se muestra en el siguiente código:

import ray
import pandas as pd

# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI

For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:

``` shell
ray job submit  --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py

Los valores que se deben configurar son la dirección URL del área de trabajo de Azure Databricks, empezando por https:// y, a continuación, los valores encontrados después de que se encuentren los /driver-proxy/o/ en la dirección URL del proxy del panel de Ray que se muestra después de iniciar el clúster de Ray.

La CLI de trabajos de Ray se usa para enviar trabajos a un clúster de Ray desde sistemas externos, pero no es necesario para enviar trabajos en clústeres de Ray en Azure Databricks. Se recomienda implementar el trabajo mediante trabajos de Azure Databricks, crear un clúster de Ray por aplicación y las herramientas existentes de Azure Databricks, como los conjuntos de recursos de Azure Databricks o los desencadenadores de flujo de trabajo, se usen para desencadenar el trabajo.

Establecimiento de una ubicación de la salida de registro

Puede establecer el argumento collect_log_to_path para especificar la ruta de acceso de destino en la que desea recopilar los registros del clúster de Ray. La recopilación de registros se ejecuta después de apagar el clúster de Ray.

Azure Databricks recomienda establecer una ruta de acceso a partir de /dbfs/ o ruta de acceso de volumen de Unity Catalog para conservar los registros incluso si finaliza el clúster de Apache Spark. De lo contrario, los registros no se pueden recuperar, ya que el almacenamiento local del clúster se elimina cuando se cierra el clúster.

Una vez creado un clúster de Ray, puede ejecutar cualquier código de aplicación de Ray directamente en el cuaderno. Haga clic en Abrir panel de clúster de Ray en una nueva pestaña para ver el panel de Ray del clúster.

Habilitación de seguimientos de pila y gráficos de llama en la página Actores del panel de Ray

En la página Panel de actores de Ray, puede ver seguimientos de pila y gráficos de llama para actores de Ray activos. Para ver esta información, use el siguiente comando para instalar py-spy antes de iniciar el clúster de Ray:

%pip install py-spy

Creación y configuración de procedimientos recomendados

En esta sección se describen los procedimientos recomendados para crear y configurar clústeres de Ray.

Cargas de trabajo que no son de GPU

El clúster de Ray se ejecuta sobre un clúster de Azure Databricks Spark. Un escenario típico es usar un trabajo de Spark y una UDF de Spark para realizar tareas sencillas de preprocesamiento de datos que no necesitan recursos de GPU. A continuación, use Ray para ejecutar tareas complicadas de aprendizaje automático que se benefician de GPU. En este caso, Azure Databricks recomienda establecer el parámetro de configuración de nivel de clúster de Apache Spark spark.task.resource.gpu.amount en 0, de modo que todas las transformaciones de DataFrame de Apache Spark y las ejecuciones de UDF de Apache Spark no usen recursos de GPU.

Las ventajas de esta configuración son las siguientes:

  • Aumenta el paralelismo del trabajo de Apache Spark, ya que el tipo de instancia de GPU suele tener muchos más núcleos de CPU que los dispositivos de GPU.
  • Si el clúster de Apache Spark se comparte con varios usuarios, esta configuración impide que los trabajos de Apache Spark compitan con recursos de GPU con cargas de trabajo de Ray que se ejecutan simultáneamente.

Deshabilitar la transformers integración de MLflow del instructor si la usa en tareas de Ray

La integración de MLflow del instructor transformers está habilitada de forma predeterminada desde la biblioteca de transformers. Si usa el entrenamiento de Ray para ajustar un modelo de transformers, las tareas de Ray producirán un error debido a un problema de credenciales. Sin embargo, este problema no se aplica si usa directamente MLflow para el entrenamiento. Para evitar este problema, puede establecer la variable de entorno DISABLE_MLFLOW_INTEGRATION en ‘TRUE’ desde la configuración del clúster de Azure Databricks al iniciar el clúster de Apache Spark.

Error de selección de funciones remotas de Ray

Para ejecutar tareas de Ray, Ray selecciona la función de tarea. Si encuentra un error en la selección, debe diagnosticar qué parte del código provoca el error. Las causas comunes de errores de selección son el control de referencias externas, cierres y referencias a objetos con estado. Uno de los errores más fáciles de comprobar y corregir rápidamente se puede solucionar moviendo instrucciones de importación dentro de la declaración de función de tarea.

Por ejemplo, datasets.load_dataset es una función ampliamente usada a la que se aplica revisiones en el lado del controlador de Azure Databricks Runtime, lo que representa la referencia no disponible. Para solucionarlo, simplemente puede escribir la función de tarea de la siguiente manera:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Deshabilitar el monitor de memoria de Ray si la tarea Ray se elimina inesperadamente con un error de memoria insuficiente (OOM)

En Ray 2.9.3, el monitor de memoria de Ray tiene varios problemas conocidos que pueden hacer que las tareas de Ray se detengan accidentalmente sin causa. Para abordar este problema, puede deshabilitar el monitor de memoria de Ray estableciendo la variable de entorno RAY_memory_monitor_refresh_ms en 0 desde la configuración del clúster de Azure Databricks al iniciar el clúster de Apache Spark.

Aplicación de las funciones de transformación a lotes de datos

Al procesar datos en lotes, se recomienda usar la API de datos de Ray con la función map_batches. Este enfoque puede ser más eficaz y escalable, especialmente para grandes conjuntos de datos o cálculos complejos que se benefician del procesamiento por lotes. Cualquier DataFrame de Spark se puede convertir en un conjunto de datos de Ray mediante la API de ray.data.from_spark. La salida procesada de llamar a esta API de transformación se puede escribir en tablas UC de Azure Databricks mediante la API ray.data.write_databricks_table.

Uso de MLflow en tareas de Ray

Para usar MLflow en tareas de Ray, deberá:

  • Definir las credenciales de MLflow de Azure Databricks en las tareas de Ray.
  • Crear ejecuciones de MLflow en el controlador de Apache Spark y pasar el run_id creado a las tareas de Ray.

El siguiente ejemplo de código muestra cómo hacerlo:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

experiment_name = "/Users/<your-name> <Databricks>.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in <AS> driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in <AS> driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Uso de bibliotecas de Python con ámbito de cuaderno o bibliotecas de Python de clúster en tareas de Ray

Actualmente, Ray tiene un problema conocido que las tareas de Ray no pueden usar bibliotecas de Python con ámbito de cuaderno o bibliotecas de Python de clúster. Para usar dependencias adicionales dentro de los trabajos de Ray, debe instalar manualmente bibliotecas mediante el comando magic de %pip antes de iniciar un clúster de Ray en Spark que usará estas dependencias dentro de las tareas. Por ejemplo, para actualizar la versión de Ray que se usará para iniciar el clúster de Ray, puede ejecutar el siguiente comando en el cuaderno:

%pip install ray==<The Ray version you want to use> --force-reinstall

A continuación, ejecute el siguiente comando en el cuaderno para reiniciar el kernel de Python:

dbutils.library.restartPython()

Pasos siguientes