Поделиться через


Рабочая нагрузка с несколькими GPU и несколькими узлами

Вы можете запустить распределенную рабочую нагрузку на нескольких GPU ( в одном узле или нескольких узлах) с помощью API Python без сервера. API предоставляет простой унифицированный интерфейс, который абстрагирует сведения о подготовке GPU, настройке среды и распределении рабочей нагрузки. При минимальных изменениях кода вы можете легко перейти от обучения на одном GPU к распределенной обработке с использованием удаленных GPU из того же цифрового блокнота.

Быстрый старт

Бессерверный API GPU для распределенного обучения предварительно установлен в бессерверных вычислительных средах GPU для записных книжек Databricks. Рекомендуется использовать среду GPU 4 или выше. Чтобы использовать его для распределенного distributed обучения, импортируйте и используйте декоратор для распространения функции обучения.

В приведенном ниже фрагменте кода показано базовое использование @distributed.

# Import the distributed decorator
from serverless_gpu import distributed

# Decorate your training function with @distributed and specify the number of GPUs, the GPU type,
# and whether or not the GPUs are remote
@distributed(gpus=8, gpu_type='A10', remote=True)
def run_train():
    ...

Ниже приведен полный пример обучения модели многослойного перцептрона (MLP) на 8 узлах A10 GPU из блокнота:

  1. Настройте модель и определите служебные функции.

    
    # Define the model
    import os
    import torch
    import torch.distributed as dist
    import torch.nn as nn
    
    def setup():
        dist.init_process_group("nccl")
        torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
    
    def cleanup():
        dist.destroy_process_group()
    
    class SimpleMLP(nn.Module):
        def __init__(self, input_dim=10, hidden_dim=64, output_dim=1):
            super().__init__()
            self.net = nn.Sequential(
                nn.Linear(input_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(hidden_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(hidden_dim, output_dim)
            )
    
        def forward(self, x):
            return self.net(x)
    
  2. Импортируйте библиотеку serverless_gpu и распределенный модуль.

    import serverless_gpu
    from serverless_gpu import distributed
    
  3. Оберните код обучения модели в функцию и декорируйте функцию декоратором @distributed.

    @distributed(gpus=8, gpu_type='A10', remote=True)
    def run_train(num_epochs: int, batch_size: int) -> None:
        import mlflow
        import torch.optim as optim
        from torch.nn.parallel import DistributedDataParallel as DDP
        from torch.utils.data import DataLoader, DistributedSampler, TensorDataset
    
        # 1. Set up multi node environment
        setup()
        device = torch.device(f"cuda:{int(os.environ['LOCAL_RANK'])}")
    
        # 2. Apply the Torch distributed data parallel (DDP) library for data-parellel training.
        model = SimpleMLP().to(device)
        model = DDP(model, device_ids=[device])
    
        # 3. Create and load dataset.
        x = torch.randn(5000, 10)
        y = torch.randn(5000, 1)
    
        dataset = TensorDataset(x, y)
        sampler = DistributedSampler(dataset)
        dataloader = DataLoader(dataset, sampler=sampler, batch_size=batch_size)
    
        # 4. Define the training loop.
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        loss_fn = nn.MSELoss()
    
        for epoch in range(num_epochs):
            sampler.set_epoch(epoch)
            model.train()
            total_loss = 0.0
            for step, (xb, yb) in enumerate(dataloader):
                xb, yb = xb.to(device), yb.to(device)
                optimizer.zero_grad()
                loss = loss_fn(model(xb), yb)
                # Log loss to MLflow metric
                mlflow.log_metric("loss", loss.item(), step=step)
    
                loss.backward()
                optimizer.step()
                total_loss += loss.item() * xb.size(0)
    
            mlflow.log_metric("total_loss", total_loss)
            print(f"Total loss for epoch {epoch}: {total_loss}")
    
        cleanup()
    
  4. Выполните распределенное обучение, вызвав распределенную функцию с определяемыми пользователем аргументами.

    run_train.distributed(num_epochs=3, batch_size=1)
    
  5. При выполнении создаётся ссылка на запуск MLflow в выходных данных ячейки записной книжки. Щелкните ссылку запуска MLflow или найдите ее на панели "Эксперимент ", чтобы просмотреть результаты выполнения.

    Выходные данные в ячейке блокнота

Сведения о распределённом выполнении

Бессерверный API GPU состоит из нескольких ключевых компонентов:

  • Менеджер вычислений: распределяет и управляет ресурсами
  • Среда выполнения. Управление средами Python и зависимостями
  • Загрузчик: управление выполнением и мониторингом заданий

При выполнении в распределенном режиме:

  • Функция сериализуется и распределяется по указанному количеству GPU
  • Каждый GPU запускает копию функции с одинаковыми параметрами
  • Среда синхронизирована на всех узлах
  • Результаты собираются и возвращаются из всех GPU

Если remote задано True значением, рабочая нагрузка распределяется на удаленные GPU. Если remote задано значение False, рабочая нагрузка выполняется на одном узле GPU, подключенном к текущему ноутбуку. Если узел содержит несколько микросхем GPU, все они будут использоваться.

API поддерживает популярные библиотеки параллельного обучения, такие как распределенная параллель данных (DDP), полностью сегментированный параллель данных ( FSDP), DeepSpeed и Ray.

Более реальные сценарии распределенного обучения можно найти с помощью различных библиотек в примерах записных книжек.

Запуск с помощью Ray

API безсерверного GPU также поддерживает запуск распределенного обучения с Ray посредством декоратора @ray_launch, который накладывается поверх @distributed. Каждая ray_launch задача сначала инициализирует rendezvous в библиотеке torch.distributed, чтобы определить ведущий рабочий процесс Ray и собрать IP-адреса. Запуск с нулевым рангом ray start --head (с экспортом метрик, если включен), устанавливает RAY_ADDRESS, и запускает декорированную функцию в качестве драйвера Ray. Другие узлы присоединяются через ray start --address и подождите, пока драйвер не напишет маркер завершения.

Дополнительные сведения о конфигурации:

  • Чтобы включить коллекцию системных метрик Ray на каждом узле, используйте RayMetricsMonitor с remote=True.
  • Определите параметры среды выполнения Ray (актеры, наборы данных, группы размещения и планирование) внутри декорированной функции с помощью стандартных API Ray.
  • Управление элементами управления на уровне кластера (количество и тип GPU, удаленный и локальный режим, асинхронное поведение и переменные среды пула Databricks) за пределами функции в аргументах декоратора или среде записной книжки.

В приведенном ниже примере показано, как использовать @ray_launch:

from serverless_gpu.ray import ray_launch
@ray_launch(gpus=16, remote=True, gpu_type='A10')
def foo():
    import os
    import ray
    print(ray.state.available_resources_per_node())
    return 1
foo.distributed()

Полный пример доступен в этой записной книжке, в которой Ray используется для обучения нейронной сети Resnet18 на нескольких GPU A10.

Этот API также можно использовать для вызова Ray Data, масштабируемой библиотеки обработки данных для рабочих нагрузок ИИ, чтобы выполнить распределённое пакетное вычисление на LLM. См. примеры vllm и sglang .

FAQs

Где должен размещаться код загрузки данных?

При использовании безсерверного API GPU для распределенного обучения переместите код загрузки данных в декоратор @distributed. Размер набора данных может превышать максимальный размер, разрешенный пикл, поэтому рекомендуется генерировать набор данных внутри декоратора, как показано ниже:

from serverless_gpu import distributed

# this may cause pickle error
dataset = get_dataset(file_path)
@distributed(gpus=8, remote=True)
def run_train():
  # good practice
  dataset = get_dataset(file_path)
  ....

Можно ли использовать зарезервированные пулы GPU?

Если в вашей рабочей области доступен зарезервированный пул GPU (обратитесь к администратору) и вы укажете remote по True в декораторе @distributed, то по умолчанию рабочая нагрузка будет запущена на зарезервированном пуле GPU. Если вы хотите использовать пул GPU по запросу, задайте для переменной DATABRICKS_USE_RESERVED_GPU_POOL среды значение False перед вызовом распределенной функции, как показано ниже:

import os
os.environ['DATABRICKS_USE_RESERVED_GPU_POOL'] = 'False'
@distributed(gpus=8, remote=True)
def run_train():
    ...

Подробнее

Справочную информацию по API можно найти в документации безсерверного GPU API для Python.