多 GPU 和多节点工作负荷

可以使用 无服务器 GPU Python API 在单个节点内或跨多个节点启动跨多个 GPU 的分布式工作负荷。 API 提供了一个简单的统一接口,用于抽象化 GPU 预配、环境设置和工作负载分发的详细信息。 通过最少的代码更改,你可以从单 GPU 训练无缝移动到同一笔记本中远程 GPU 之间的分布式执行。

快速入门

用于分布式训练的无服务器 GPU API 预安装在 Databricks 笔记本的无服务器 GPU 计算环境中。 建议 使用 GPU 环境 4 及更高版本。 若要将其用于分布式训练,请导入并使用 distributed 修饰器来分配训练函数。

下面的代码片段显示了@distributed的基本用法:

# Import the distributed decorator
from serverless_gpu import distributed

# Decorate your training function with @distributed and specify the number of GPUs, the GPU type,
# and whether or not the GPUs are remote
@distributed(gpus=8, gpu_type='A10', remote=True)
def run_train():
    ...

下面是从笔记本中训练 8 A10 GPU 节点上的多层感知器(MLP)模型的完整示例:

  1. 设置模型并定义实用工具函数。

    
    # Define the model
    import os
    import torch
    import torch.distributed as dist
    import torch.nn as nn
    
    def setup():
        dist.init_process_group("nccl")
        torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
    
    def cleanup():
        dist.destroy_process_group()
    
    class SimpleMLP(nn.Module):
        def __init__(self, input_dim=10, hidden_dim=64, output_dim=1):
            super().__init__()
            self.net = nn.Sequential(
                nn.Linear(input_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(hidden_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(hidden_dim, output_dim)
            )
    
        def forward(self, x):
            return self.net(x)
    
  2. 导入 serverless_gpu 库和 分布式 模块。

    import serverless_gpu
    from serverless_gpu import distributed
    
  3. 将模型训练代码包装在函数中,并使用修饰器修饰函数 @distributed

    @distributed(gpus=8, gpu_type='A10', remote=True)
    def run_train(num_epochs: int, batch_size: int) -> None:
        import mlflow
        import torch.optim as optim
        from torch.nn.parallel import DistributedDataParallel as DDP
        from torch.utils.data import DataLoader, DistributedSampler, TensorDataset
    
        # 1. Set up multi node environment
        setup()
        device = torch.device(f"cuda:{int(os.environ['LOCAL_RANK'])}")
    
        # 2. Apply the Torch distributed data parallel (DDP) library for data-parellel training.
        model = SimpleMLP().to(device)
        model = DDP(model, device_ids=[device])
    
        # 3. Create and load dataset.
        x = torch.randn(5000, 10)
        y = torch.randn(5000, 1)
    
        dataset = TensorDataset(x, y)
        sampler = DistributedSampler(dataset)
        dataloader = DataLoader(dataset, sampler=sampler, batch_size=batch_size)
    
        # 4. Define the training loop.
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        loss_fn = nn.MSELoss()
    
        for epoch in range(num_epochs):
            sampler.set_epoch(epoch)
            model.train()
            total_loss = 0.0
            for step, (xb, yb) in enumerate(dataloader):
                xb, yb = xb.to(device), yb.to(device)
                optimizer.zero_grad()
                loss = loss_fn(model(xb), yb)
                # Log loss to MLflow metric
                mlflow.log_metric("loss", loss.item(), step=step)
    
                loss.backward()
                optimizer.step()
                total_loss += loss.item() * xb.size(0)
    
            mlflow.log_metric("total_loss", total_loss)
            print(f"Total loss for epoch {epoch}: {total_loss}")
    
        cleanup()
    
  4. 使用用户定义的参数调用分布式函数来执行分布式训练。

    run_train.distributed(num_epochs=3, batch_size=1)
    
  5. 执行时,将在笔记本单元输出中生成 MLflow 运行链接。 单击 MLflow 运行链接或在 “试验 ”面板中找到它以查看运行结果。

    笔记本单元格中的输出

分布式执行详细信息

无服务器 GPU API 由多个关键组件组成:

  • 计算管理器:处理资源分配和管理
  • 运行时环境:管理 Python 环境和依赖项
  • 启动器:管理和协调作业执行和监控

在分布式模式下运行时:

  • 函数序列化并分布在指定数量的 GPU 中
  • 每个 GPU 都使用相同的参数运行函数的副本
  • 环境在所有节点上同步
  • 从所有 GPU 收集并返回结果

如果 remote 设置为 True,则工作负荷将分发到远程 GPU 上。 如果 remote 设置为 False,则工作负荷在连接到当前笔记本的单个 GPU 节点上运行。 如果节点有多个 GPU 芯片,则会使用所有这些芯片。

该 API 支持常用的并行训练库,例如 分布式数据并行 (DDP)、 完全分片数据并行 (FSDP)、 DeepSpeedRay

可以使用 笔记本示例中的各种库找到更真实的分布式训练方案。

使用 Ray 启动

无服务器 GPU API 还支持使用 Ray 启动分布式训练,方法是使用在 @distributed 之上的 @ray_launch 修饰器。 每个 ray_launch 任务首先启动一个 PyTorch 分布式会合,以确定 Ray 主节点工作者并收集 IP。 从零级开始 ray start --head (如果启用了指标导出),设置 RAY_ADDRESS,然后将您装饰过的函数作为 Ray 的驱动程序运行。 其他节点通过 ray start --address 联接并等待驱动程序写入完成标记。

其他配置详细信息:

  • 若要在每个节点上启用 Ray 系统指标收集,请使用 RayMetricsMonitor 配合 remote=True
  • 您可以使用标准 Ray API 在修饰后的函数 定义 Ray 的运行时选项(演员、数据集、放置组和调度)。
  • 管理整个群集的控件(GPU 数量和类型、远程与本地模式、异步行为以及 Databricks 池环境变量)应在修饰器参数或笔记本环境的函数外部进行。

下面的示例演示如何使用 @ray_launch

from serverless_gpu.ray import ray_launch
@ray_launch(gpus=16, remote=True, gpu_type='A10')
def foo():
    import os
    import ray
    print(ray.state.available_resources_per_node())
    return 1
foo.distributed()

关于完整示例,请参阅此笔记本,它启动 Ray 以在多个 A10 GPU 上训练 Resnet18 神经网络。

FAQs

应在何处放置数据加载代码?

使用 无服务器 GPU API 进行分布式训练时,在 @distributed 修饰器内移动数据加载代码。 数据集大小可以超出 pickle 允许的最大大小,因此建议在装饰器内生成数据集,如下所示:

from serverless_gpu import distributed

# this may cause pickle error
dataset = get_dataset(file_path)
@distributed(gpus=8, remote=True)
def run_train():
  # good practice
  dataset = get_dataset(file_path)
  ....

是否可以使用保留的 GPU 池?

如果您工作区中的预留 GPU 池可用(请与管理员核实),并且您在@distributed修饰器中指定remoteTrue,则默认情况下,任务将启动在预留的 GPU 池上。 如果您希望使用按需 GPU 池,请在调用分布式函数之前将环境变量设置为 False,如下所示:

import os
os.environ['DATABRICKS_USE_RESERVED_GPU_POOL'] = 'False'
@distributed(gpus=8, remote=True)
def run_train():
    ...

了解详细信息

有关 API 参考,请参阅 无服务器 GPU Python API 文档。