在 Azure Databricks 上使用 Ray

通过 Ray 2.3.0 及更高版本,可使用 Azure Databricks 在 Apache Spark 群集上创建 Ray 群集和运行 Ray 应用程序。 有关 Ray 上的机器学习入门信息(包括教程和示例),请参阅 Ray 文档。 有关 Ray 和 Apache Spark 集成的详细信息,请参阅 Spark 上的 Ray API 文档

要求

  • Databricks Runtime 12.2 LTS ML 及更高版本。
  • Databricks Runtime 群集访问模式必须是“已分配”模式或“无隔离共享”模式。

安装 Ray

使用以下命令安装 Ray。 Ray 仪表板组件需要 [default] 扩展。

%pip install ray[default]>=2.3.0

在 Databricks 群集中创建特定于用户的 Ray 群集

若要创建 Ray 群集,请使用 ray.util.spark.setup_ray_cluster API。

在任何附加到 Databricks 群集的 Databricks 笔记本中,都可以运行以下命令:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_worker_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

ray.util.spark.setup_ray_cluster API 在 Spark 上创建 Ray 群集。 在内部,它会创建一个后台 Spark 作业。 该作业中的每个 Spark 任务创建一个 Ray 工作器节点,Ray 头节点是在驱动程序上创建的。 参数 num_worker_nodes 表示要创建的 Ray 工作器节点数量。 若要指定分配给每个 Ray 工作器节点的 CPU 或 GPU 核心数,请设置参数 num_cpus_worker_node(默认值:1)或 num_gpus_worker_node(默认值:0)。

创建 Ray 群集后,可以直接在笔记本中运行任何 Ray 应用程序代码。 单击“在新选项卡中打开 Ray 群集仪表板”,查看群集的 Ray 仪表板

提示

如果使用的是 Azure Databricks 单用户群集,可以将 num_worker_nodes 设置为 ray.util.spark.MAX_NUM_WORKER_NODES,以便对 Ray 群集使用所有可用资源。

setup_ray_cluster(
  # ...
  num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)

设置参数 collect_log_to_path 以指定要收集 Ray 群集日志的目标路径。 在关闭 Ray 群集后,日志收集将会运行。 Databricks 建议设置一个以 /dbfs/ 开头的路径,这样,即使终止 Spark 群集,也会保留日志。 否则,日志不可恢复,因为群集关闭时会删除群集上的本地存储。

注意

“要使 Ray 应用程序自动使用创建的 Ray 群集,请调用 ray.util.spark.setup_ray_cluster,以将 RAY_ADDRESS 环境变量设置为 Ray 群集的地址。” 可以使用 ray.init API 的 address 参数指定替代群集地址。

运行 Ray 应用程序

创建 Ray 群集后,可以在 Azure Databricks 笔记本中运行任何 Ray 应用程序代码。

重要

Databricks 建议使用 %pip install <your-library-dependency> 为应用程序安装任何必要的库,以确保这些库相应地可供 Ray 群集和应用程序使用。 在 Ray init 函数调用中指定依赖项会将依赖项安装在 Spark 工作器节点无法访问的位置,这会导致版本不兼容和导入错误。

例如,可以在 Azure Databricks 笔记本中运行简单的 Ray 应用程序,如下所示:

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

在自动缩放模式下创建 Ray 群集

在 Ray 2.8.0 及更高版本中,在 Databricks 上启动的 Ray 群集支持与 Databricks 自动缩放集成。 请参阅 Databricks 群集自动缩放

使用 Ray 2.8.0 及更高版本,可以在 Databricks 群集上创建一个支持根据工作负载进行纵向扩展或缩减的 Ray 群集。 此自动缩放集成在 Databricks 环境内部触发 Databricks 群集自动缩放。

若要启用自动缩放,请运行以下命令:

from ray.util.spark import setup_ray_cluster

setup_ray_cluster(
  num_worker_nodes=8,
  autoscale=True,
  ... # other arguments
)

如果已启用自动缩放,num_worker_nodes 指示 Ray 工作器节点的最大数目。 Ray 工作器节点的默认最小数目为 0。 此默认设置意味着,当 Ray 群集处于空闲状态时,它会纵向缩减到零个 Ray 工作器节点。 这可能并不是所有方案中快速响应的理想方案,但启用后,可以大大降低成本。

在自动缩放模式下,无法将 num_worker_nodes 设置为 ray.util.spark.MAX_NUM_WORKER_NODES

以下参数用于配置纵向扩展和缩减速度:

  • autoscale_upscaling_speed 表示允许挂起的节点数,为当前节点数的倍数。 该值越大,纵向扩展就越激进。 例如,如果此值设置为 1.0,则群集大小可以随时以最大 100% 的速度增长。
  • autoscale_idle_timeout_minutes 表示需要经过多少分钟后,自动缩放程序才可移除空闲工作器节点。 该值越小,纵向缩减就越激进。

使用 Ray 2.9.0 及更高版本,还可以设置 autoscale_min_worker_nodes,以防止 Ray 群集在 Ray 群集处于空闲状态时纵向缩减为零个工作器。

使用 Ray 客户端连接到远程 Ray 群集

在 Ray 2.9.3 中,通过调用 setup_ray_cluster API 创建 Ray 群集。 在同一笔记本中,调用 ray.init() API 以连接到此 Ray 群集。

对于未处于全局模式的 Ray 群集,请使用以下代码获取远程连接字符串:

使用以下方法获取远程连接字符串:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

使用此远程连接字符串连接到远程群集:

import ray
ray.init(remote_conn_str)

Ray 客户端不支持在 ray.data 模块中定义的 Ray 数据集 API。 解决方法是,可以将调用 Ray 数据集 API 的代码包装到远程 Ray 任务中,如以下代码所示:

import ray
import pandas as pd
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())

从 Spark 数据帧加载数据

若要将 Spark 数据帧作为光线数据集加载,首先必须将 Spark 数据帧保存到 UC 卷或 Databricks 文件系统(已弃用)作为 Parquet 格式。 为了安全地控制 Databricks 文件系统访问,Databricks 建议将云对象存储装载到 DBFS。 然后,可以使用以下帮助器方法从保存的 Spark 数据帧路径创建 ray.data.Dataset 实例:

import ray
import os
from urllib.parse import urlparse

def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
    spark_dataframe.write.mode('overwrite').parquet(dbfs_tmp_path)
    fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
    return ray.data.read_parquet(fuse_path)

# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")

# Provide a dbfs location to write the table to
data_location_2 = (
    "dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)

# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
    spark_dataframe=spark_df,
    dbfs_tmp_path=data_location_2
)

通过 Databricks SQL 仓库从 Unity Catalog 表加载数据

对于 Ray 2.8.0 及更高版本,可以调用 ray.data.read_databricks_tables API,以从 Databricks Unity Catalog 表加载数据。

首先,需要将 DATABRICKS_TOKEN 环境变量设置为 Databricks 仓库访问令牌。 如果未在 Databricks Runtime 上运行程序,请将 DATABRICKS_HOST 环境变量设置为 Databricks 工作区 URL,如下所示:

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

然后,调用 ray.data.read_databricks_tables() 以从 Databricks SQL 仓库进行读取。

import ray

ray_dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

配置 Ray 头节点使用的资源

默认情况下,对于 Spark 上的 Ray 配置,Databricks 会将分配给 Ray 头节点的资源限制为:

  • 0 个 CPU 核心
  • 0 个 GPU
  • 128 MB 堆内存
  • 128 MB 对象存储内存

这是因为 Ray 头节点通常用于全局协调,而不是用于执行 Ray 任务。 Spark 驱动程序节点资源是与多个用户共享的,因此默认设置将资源保存在 Spark 驱动程序端。

使用 Ray 2.8.0 及更高版本,可以配置 Ray 头节点使用的资源。 在 setup_ray_cluster API 中使用下列参数:

  • num_cpus_head_node:设置 Ray 头节点使用的 CPU 核心数
  • num_gpus_head_node:设置 Ray 头节点使用的 GPU
  • object_store_memory_head_node:按 Ray 头节点设置对象存储内存大小

支持异类群集

为了更高效且经济有效的训练运行,可以创建 Spark 上的 Ray 群集,并在 Ray 头节点和 Ray 工作器节点之间设置不同的配置。 但是,所有 Ray 工作器节点都必须具有相同的配置。 Databricks 群集不完全支持异类群集,但可以通过设置群集策略创建具有不同驱动程序和工作器实例类型的 Databricks 群集。

例如:

{
  "node_type_id": {
    "type": "fixed",
    "value": "i3.xlarge"
  },
  "driver_node_type_id": {
    "type": "fixed",
    "value": "g4dn.xlarge"
  },
  "spark_version": {
    "type": "fixed",
    "value": "13.x-snapshot-gpu-ml-scala2.12"
  }
}

优化 Ray 群集配置

每个 Ray 工作器节点的建议配置是:

  • 每个 Ray 工作器节点至少有 4 个 CPU 核心。
  • 每个 Ray 工作器节点至少有 10GB 堆内存。

调用 ray.util.spark.setup_ray_cluster 时,Databricks 建议将 num_cpus_worker_node 设置为值 >= 4

有关为每个 Ray 工作器节点优化堆内存的详细信息,请参阅 Ray 工作器节点的内存分配

Ray 工作器节点的内存分配

每个 Ray 工作器节点使用两种类型的内存:堆内存和对象存储内存。 按如下所述确定每种类型的分配内存大小。

分配给每个 Ray 工作器节点的内存总量为:

RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES 是可以在 Spark 工作器节点上启动的 Ray 工作器节点的最大数量。 此数量由参数 num_cpus_worker_nodenum_gpus_worker_node 确定。

如果未设置参数 object_store_memory_per_node,则分配给每个 Ray 工作器节点的堆内存大小和对象存储内存大小为:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

如果设置了参数 object_store_memory_per_node

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

此外,每个 Ray 工作器节点的对象存储内存大小受操作系统共享内存的限制。 最大值为:

OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY 是为 Spark 工作器节点配置的 /dev/shm 磁盘大小。

最佳做法

如何为每个 Ray 工作器节点设置 CPU/GPU 编号?

Databricks 建议将 num_cpus_worker_node 设置为每个 Spark 工作器节点的 CPU 核心数,并将 num_gpus_worker_node 设置为每个 Spark 工作器节点的 GPU 数。 在此配置中,每个 Spark 工作器节点启动一个完全利用 Spark 工作器节点资源的 Ray 工作器节点。

GPU 群集配置

Ray 群集在 Databricks Spark 群集上运行。 常见方案是使用 Spark 作业和 Spark UDF 执行不需要 GPU 资源的简单数据预处理任务,然后使用 Ray 执行受益于 GPU 的复杂机器学习任务。 在这种情况下,Databricks 建议将 Spark 群集级别配置参数 spark.task.resource.gpu.amount 设置为 0,以便所有 Spark 数据帧转换和 Spark UDF 执行不使用 GPU 资源。

此配置的优点如下:

  • 它可提高 Spark 作业并行度,因为 GPU 实例类型的 CPU 核心数通常比 GPU 设备多很多。
  • 如果 Spark 群集是与多个用户共享的,则此配置可防止 Spark 作业与同时运行的 Ray 工作负载争用 GPU 资源。

如果在 Ray 任务中使用 mlflow 集成,请禁用 transformers 训练器 mlflow 集成

默认情况下,transformers 训练程序 MLflow 集成处于启用状态。 如果使用 Ray 训练训练,则 Ray 任务会失败,因为未为 Ray 任务配置 Databricks MLflow 服务凭据。

若要避免此问题,请将 databricks 群集配置中的 DISABLE_MLFLOW_INTEGRATION 环境变量设置为‘TRUE’。有关在 Ray 训练器任务中登录到 MLflow 的信息,请参阅“在 Ray 任务中使用 MLflow”部分“了解详细信息。

地址 Ray 远程函数选取错误

若要执行 Ray 任务,Ray 使用 pickle 序列化任务函数。 如果选取失败,请确定代码中发生失败的行。 通常,将 import 命令移动到任务函数中可解决常见的选取错误。 例如,datasets.load_dataset 是一个广泛使用的函数,该函数恰好在 Databricks Runtime 中修补,可能会呈现外部导入无法实现。 若要更正此问题,可以更新如下所示的代码:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

如果 Ray 任务意外终止并出现 OOM 错误,请禁用 Ray 内存监视器

在 Ray 2.9.3 中,Ray 内存监视器存在已知问题,导致 Ray 任务被错误地杀死。

若要解决此问题,请将环境变量 RAY_memory_monitor_refresh_ms 设置为 Databricks 群集配置中的 0 来禁用 Ray 内存监视器。

Spark 和 Ray 混合工作负荷的内存资源配置

如果在 Databricks 群集中运行混合 Spark 和 Ray 工作负荷,Databricks 建议将 Spark 执行器内存减少到较小的值,例如在 Databricks 群集配置中设置 spark.executor.memory 4g。这是因为在 Java 进程中运行的 Spark 执行程序会延迟触发垃圾回收 (GC)。 Spark 数据集缓存的内存压力相当高,导致 Ray 可以使用的可用内存减少。 为了避免潜在的 OOM 错误,Databricks 建议将配置的‘spark.executor.memory’值减少到小于默认值的值。

Spark 和 Ray 混合工作负荷的计算资源配置

如果在 Databricks 群集中运行混合 Spark 和 Ray 工作负载,请将 Spark 群集节点设置为可自动缩放,将 Ray 工作器节点设置为自动缩放,或者同时启用自动缩放。

例如,如果 Databricks 群集中有固定数量的工作器节点,请考虑启用 Ray-on-Spark 自动缩放,以便在没有运行 Ray 工作负荷时,Ray 群集会缩减。 因此,释放空闲的群集资源,以便 Spark 作业可以使用它们。

Spark 作业完成并启动 Ray 作业时,它会触发 Ray-on-Spark 群集以纵向扩展以满足处理需求。

还可以使 Databricks 群集和 Ray-on-spark 群集自动缩放。 具体而言,可以将 Databricks 群集自动缩放节点配置为最多 10 个节点,将 Ray-on-Spark 工作器节点配置为最多 4 个节点(每个 Spark 辅助角色有一个 Ray 工作器节点),使 Spark 可以免费分配最多 6 个 Spark 任务节点。 这意味着 Ray 工作负载最多可以同时使用 4 个节点资源,而 Spark 作业最多可以分配 6 个节点的资源。

将转换函数应用于数据批处理

分批处理数据时,Databricks 建议将 Ray 数据 API 与 map_batches 函数配合使用。 此方法可以更高效且可缩放,尤其是对于大型数据集,或者在执行受益于批处理的复杂计算时。 任何 Spark 数据帧都可以使用 ray.data.from_spark API 转换为 Ray 数据,并且可以使用 API ray.data.write_databricks_table 写到 databricks UC 表。

在 Ray 任务中使用 MLflow

若要在 Ray 任务中使用 MLflow,请配置以下内容:

  • Ray 任务中的 Databricks MLflow 凭据
  • MLflow 在 Spark 驱动程序端运行,该驱动程序将生成的 run_id 值传递给 Ray 任务。

以下代码是一个示例:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

experiment_name = "/Users/<your-name>@databricks.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in Spark driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in Spark driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

在 Ray 任务中使用笔记本范围内的 Python 库或群集 Python 库

目前,Ray 有一个已知问题,Ray 任务无法使用笔记本范围的 Python 库或群集 Python 库。 若要解决此限制,请在启动 Ray-on-Spark 群集之前在笔记本中运行以下命令:

%pip install ray==<The Ray version you want to use> --force-reinstall

然后在笔记本中运行以下命令以重启 python 内核:

dbutils.library.restartPython()

在 Ray“仪表板执行组件”页上启用堆栈跟踪和火焰图

在 Ray“仪表板执行组件”页上,可以查看活动 Ray 执行组件的堆栈跟踪和火焰图。

若要查看此信息,请在启动 Ray 群集之前安装 py-spy

%pip install py-spy

关闭 Ray 群集

若要关闭在 Azure Databricks 上运行的 Ray 群集,请调用 ray.utils.spark.shutdown_ray_cluster API。

注意

Ray 群集在以下情况下也会关闭:

  • 从 Azure Databricks 群集中分离交互式笔记本。
  • Azure Databricks 作业已完成。
  • Azure Databricks 群集已重启或终止。
  • 指定的空闲时间没有活动。

示例笔记本

以下笔记本演示如何在 Databricks 上创建 Ray 群集和运行 Ray 应用程序。

Spark 上的 Ray 初学者笔记本

获取笔记本

限制

  • 不支持多用户共享 Azure Databricks 群集(启用隔离模式)。
  • 使用 %pip 安装包时,Ray 群集将会关闭。 请确保在使用 %pip 安装完所有库后启动 Ray。
  • 使用替代 ray.util.spark.setup_ray_cluster 中的配置的集成可能会导致 Ray 群集变得不稳定,并可能导致 Ray 上下文崩溃。 例如,使用 xgboost_ray 包,并使用超过 Ray 群集配置的执行组件或 cpus_per_actor 配置设置 RayParams,可能会导致 Ray 群集以无提示方式崩溃。