通过


多 GPU 工作负荷

重要

此功能在 Beta 版中。 工作区管理员可以从 预览 页控制对此功能的访问。 请参阅 Manage Azure Databricks 预览版

可以使用 无服务器 GPU Python API 在单个节点上跨多个 GPU 启动分布式工作负荷。 API 提供了一个简单的统一接口,用于抽象化 GPU 预配、环境设置和工作负载分发的详细信息。 通过最少的代码更改,可以从同一笔记本无缝地从单 GPU 训练迁移到多 GPU 分布式执行。

支持的框架

API @distributed 与主要的分布式训练库集成:

  • PyTorch 分布式数据并行 (DDP) - 标准多 GPU 数据并行度。
  • 完全分片数据并行 (FSDP) - 大型模型的内存高效训练。
  • DeepSpeed — Microsoft用于大型模型训练的优化库。

serverless_gpu API 与 TorchDistributor

下表将 API 与 serverless_gpu 进行比较@distributed

功能 serverless_gpu @distributed API 火炬分配器
基础结构 完全无服务器,无群集管理 需要具有 GPU 工作节点的 Spark 群集
Setup 单个修饰器,最小配置 需要 Spark 群集和 TorchDistributor 设置
框架支持 PyTorch DDP、FSDP、DeepSpeed 主要是 PyTorch DDP
数据加载 在修饰器内,使用 Unity 目录卷 通过 Spark 或文件系统

Databricks 上的新深度学习工作负载推荐使用serverless_gpuAPI。 TorchDistributor 仍可用于与 Spark 群集紧密耦合的工作负载。

快速入门

当在 Databricks 笔记本和作业中连接到无服务器 GPU 时,用于分布式训练的无服务器 GPU API 会被预安装。 建议 使用 GPU 环境 4 及更高版本。 若要将其用于分布式训练,请导入并使用 distributed 修饰器来分配训练函数。

将模型训练代码包装在函数中,并使用修饰器修饰函数 @distributed 。 修饰的函数将成为分布式执行的入口点 - 应在此函数内定义所有训练逻辑、数据加载和模型初始化。

警告

中的gpu_type@distributed参数必须与笔记本连接到的加速器类型匹配。 例如, @distributed(gpus=8, gpu_type='H100') 要求笔记本连接到 H100 加速器。 使用不匹配的加速器类型(例如在指定 H100 时连接到 A10)将导致工作负荷失败。

下面的代码片段显示了@distributed的基本用法:

# Import the distributed decorator
from serverless_gpu import distributed

# Decorate your training function with @distributed and specify the number of GPUs and GPU type
@distributed(gpus=8, gpu_type='H100')
def run_train():
    ...

下面是从笔记本中训练 8 H100 GPU 上的多层感知器(MLP)模型的完整示例:

  1. 设置模型并定义实用工具函数。

    
    # Define the model
    import os
    import torch
    import torch.distributed as dist
    import torch.nn as nn
    
    def setup():
        dist.init_process_group("nccl")
        torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
    
    def cleanup():
        dist.destroy_process_group()
    
    class SimpleMLP(nn.Module):
        def __init__(self, input_dim=10, hidden_dim=64, output_dim=1):
            super().__init__()
            self.net = nn.Sequential(
                nn.Linear(input_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(hidden_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(hidden_dim, output_dim)
            )
    
        def forward(self, x):
            return self.net(x)
    
  2. 导入 serverless_gpu 库和 分布式 模块。

    import serverless_gpu
    from serverless_gpu import distributed
    
  3. 将模型训练代码包装在函数中,并使用修饰器修饰函数 @distributed

    @distributed(gpus=8, gpu_type='H100')
    def run_train(num_epochs: int, batch_size: int) -> None:
        import mlflow
        import torch.optim as optim
        from torch.nn.parallel import DistributedDataParallel as DDP
        from torch.utils.data import DataLoader, DistributedSampler, TensorDataset
    
        # 1. Set up multi-GPU environment
        setup()
        device = torch.device(f"cuda:{int(os.environ['LOCAL_RANK'])}")
    
        # 2. Apply the Torch distributed data parallel (DDP) library for data-parellel training.
        model = SimpleMLP().to(device)
        model = DDP(model, device_ids=[device])
    
        # 3. Create and load dataset.
        x = torch.randn(5000, 10)
        y = torch.randn(5000, 1)
    
        dataset = TensorDataset(x, y)
        sampler = DistributedSampler(dataset)
        dataloader = DataLoader(dataset, sampler=sampler, batch_size=batch_size)
    
        # 4. Define the training loop.
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        loss_fn = nn.MSELoss()
    
        for epoch in range(num_epochs):
            sampler.set_epoch(epoch)
            model.train()
            total_loss = 0.0
            for step, (xb, yb) in enumerate(dataloader):
                xb, yb = xb.to(device), yb.to(device)
                optimizer.zero_grad()
                loss = loss_fn(model(xb), yb)
                # Log loss to MLflow metric
                mlflow.log_metric("loss", loss.item(), step=step)
    
                loss.backward()
                optimizer.step()
                total_loss += loss.item() * xb.size(0)
    
            mlflow.log_metric("total_loss", total_loss)
            print(f"Total loss for epoch {epoch}: {total_loss}")
    
        cleanup()
    
  4. 使用用户定义的参数调用分布式函数来执行分布式训练。

    run_train.distributed(num_epochs=3, batch_size=1)
    
  5. 执行时,将在笔记本单元输出中生成 MLflow 运行链接。 单击 MLflow 运行链接或在 “试验 ”面板中找到它以查看运行结果。

分布式执行详细信息

无服务器 GPU API 由多个关键组件组成:

  • 计算管理器:处理资源分配和管理
  • 运行时环境:管理 Python 环境和依赖项
  • 启动器:管理和协调作业执行和监控

在分布式模式下运行时:

  • 函数序列化并分布在指定数量的 GPU 中
  • 每个 GPU 都使用相同的参数运行函数的副本
  • 环境在所有 GPU 上保持同步
  • 从所有 GPU 收集并返回结果

该 API 支持常用的并行训练库,例如 分布式数据并行 (DDP)、 完全分片数据并行FSDP)、DeepSpeed

可以使用 笔记本示例中的各种库找到更真实的分布式训练方案。

常见问题

应在何处放置数据加载代码?

使用 无服务器 GPU API 进行分布式训练时,在 @distributed 修饰器内移动数据加载代码。 数据集大小可以超出 pickle 允许的最大大小,因此建议在装饰器内生成数据集,如下所示:

from serverless_gpu import distributed

# this may cause pickle error
dataset = get_dataset(file_path)
@distributed(gpus=8, gpu_type='H100')
def run_train():
  # good practice
  dataset = get_dataset(file_path)
  ....

了解详细信息

有关 API 参考,请参阅 无服务器 GPU Python API 文档。