Partilhar via


Carga de trabalho multi-GPU e multi-nós

Pode lançar cargas de trabalho distribuídas em múltiplas GPUs — seja dentro de um único nó ou entre vários nós — usando a API Python da GPU Serverless. A API fornece uma interface simples e unificada que abstrai os detalhes do provisionamento da GPU, da configuração do ambiente e da distribuição da carga de trabalho. Com alterações mínimas de código, você pode passar facilmente do treinamento de GPU única para a execução distribuída entre GPUs remotas do mesmo notebook.

Início rápido

A API serverless de GPU para treino distribuído vem pré-instalada em ambientes de computação de GPU sem servidor para notebooks Databricks. Recomendamos o ambiente GPU 4 e superiores. Para usá-lo para treino distribuído, importe e utilize o decorador distributed para distribuir a sua função de treino.

O excerto de código abaixo mostra a utilização básica de @distributed:

# Import the distributed decorator
from serverless_gpu import distributed

# Decorate your training function with @distributed and specify the number of GPUs, the GPU type,
# and whether or not the GPUs are remote
@distributed(gpus=8, gpu_type='A10', remote=True)
def run_train():
    ...

Abaixo está um exemplo completo que treina um modelo de perceptrão multicamada (MLP) em 8 nós de GPU A10 a partir de um portátil:

  1. Configura o teu modelo e define funções de utilidade.

    
    # Define the model
    import os
    import torch
    import torch.distributed as dist
    import torch.nn as nn
    
    def setup():
        dist.init_process_group("nccl")
        torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
    
    def cleanup():
        dist.destroy_process_group()
    
    class SimpleMLP(nn.Module):
        def __init__(self, input_dim=10, hidden_dim=64, output_dim=1):
            super().__init__()
            self.net = nn.Sequential(
                nn.Linear(input_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(hidden_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(hidden_dim, output_dim)
            )
    
        def forward(self, x):
            return self.net(x)
    
  2. Importa a biblioteca serverless_gpu e o módulo `distributed`.

    import serverless_gpu
    from serverless_gpu import distributed
    
  3. Envolve o código de treino do modelo numa função e decora a função com o @distributed decorador.

    @distributed(gpus=8, gpu_type='A10', remote=True)
    def run_train(num_epochs: int, batch_size: int) -> None:
        import mlflow
        import torch.optim as optim
        from torch.nn.parallel import DistributedDataParallel as DDP
        from torch.utils.data import DataLoader, DistributedSampler, TensorDataset
    
        # 1. Set up multi node environment
        setup()
        device = torch.device(f"cuda:{int(os.environ['LOCAL_RANK'])}")
    
        # 2. Apply the Torch distributed data parallel (DDP) library for data-parellel training.
        model = SimpleMLP().to(device)
        model = DDP(model, device_ids=[device])
    
        # 3. Create and load dataset.
        x = torch.randn(5000, 10)
        y = torch.randn(5000, 1)
    
        dataset = TensorDataset(x, y)
        sampler = DistributedSampler(dataset)
        dataloader = DataLoader(dataset, sampler=sampler, batch_size=batch_size)
    
        # 4. Define the training loop.
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        loss_fn = nn.MSELoss()
    
        for epoch in range(num_epochs):
            sampler.set_epoch(epoch)
            model.train()
            total_loss = 0.0
            for step, (xb, yb) in enumerate(dataloader):
                xb, yb = xb.to(device), yb.to(device)
                optimizer.zero_grad()
                loss = loss_fn(model(xb), yb)
                # Log loss to MLflow metric
                mlflow.log_metric("loss", loss.item(), step=step)
    
                loss.backward()
                optimizer.step()
                total_loss += loss.item() * xb.size(0)
    
            mlflow.log_metric("total_loss", total_loss)
            print(f"Total loss for epoch {epoch}: {total_loss}")
    
        cleanup()
    
  4. Execute o treino distribuído chamando a função distribuída com argumentos definidos pelo utilizador.

    run_train.distributed(num_epochs=3, batch_size=1)
    
  5. Quando for executada, é gerado um link de execução do MLflow na saída da célula do notebook. Clique no link da execução do MLflow ou encontre-o no painel Experimentos para ver os resultados da execução.

    Saída na célula do notebook

Detalhes de execução distribuída

A API de GPU serverless consiste em vários componentes-chave:

  • Gestor de computação: Gere a alocação e gestão de recursos
  • Ambiente de execução: Gere ambientes e dependências Python
  • Lançador: Orquestra a execução e monitorização do trabalho

Quando executar em modo distribuído:

  • A função é serializada e distribuída pelo número especificado de GPUs
  • Cada GPU executa uma cópia da função com os mesmos parâmetros
  • O ambiente está sincronizado em todos os nós
  • Os resultados são recolhidos e devolvidos de todas as GPUs

Se remote for definido para True, a carga de trabalho é distribuída nas GPUs remotas. Se remote for definido para False, a carga de trabalho corre no único nó da GPU ligado ao portátil atual. Se o nó tiver vários chips GPU, todos eles serão utilizados.

A API suporta bibliotecas de treinamento paralelas populares, como DDP (Distributed Data Parallel ), FSDP (Fully Sharded Data Parallel ), DeepSpeed e Ray.

Pode encontrar mais cenários reais de treino distribuído usando as várias bibliotecas em exemplos de cadernos.

Iniciar com Ray

A API serverless da GPU também suporta o lançamento de treino distribuído usando Ray usando o @ray_launch decorador, que é sobreposto a @distributed. Cada ray_launch tarefa inicia primeiro um sistema de rendezvous distribuído por PyTorch para determinar o trabalhador principal do Ray e recolher os IPs. Rank-zero inicia ray start --head (com exportação de métricas, se estiver ativa), define RAY_ADDRESS e executa a sua função decorada como o driver Ray. Outros nós juntam-se via ray start --address e aguardam até que o driver escreva um marcador de conclusão.

Detalhes adicionais de configuração:

  • Para ativar a recolha de métricas do sistema Ray em cada nó, utilize RayMetricsMonitor com remote=True.
  • Defina opções de tempo de execução do Ray (actores, conjuntos de dados, grupos de posicionamento e agendamento) dentro da sua função decorada usando APIs padrão do Ray.
  • Gerir controlos a nível de cluster (contagem e tipo de GPU, modo remoto vs. local, comportamento assíncrono e variáveis do ambiente Databricks pool) fora da função no ambiente de argumentos do decorador ou notebook.

O exemplo abaixo mostra como usar @ray_launch:

from serverless_gpu.ray import ray_launch
@ray_launch(gpus=16, remote=True, gpu_type='A10')
def foo():
    import os
    import ray
    print(ray.state.available_resources_per_node())
    return 1
foo.distributed()

Para um exemplo completo, veja este notebook, que inicia o Ray para treinar uma rede neural Resnet18 em múltiplas GPUs A10.

FAQs

Onde deve ser colocado o código de carregamento dos dados?

Ao usar a API de GPU Serverless para treinamento distribuído, mova o código de carregamento de dados para dentro do decorador @distributed. O tamanho do conjunto de dados pode exceder o tamanho máximo permitido pelo pickle, por isso é recomendado criar o conjunto de dados dentro do decorador, como mostrado abaixo:

from serverless_gpu import distributed

# this may cause pickle error
dataset = get_dataset(file_path)
@distributed(gpus=8, remote=True)
def run_train():
  # good practice
  dataset = get_dataset(file_path)
  ....

Posso usar pools de GPU reservados?

Se houver um pool de GPU reservado disponível (por favor, confirme com o seu administrador) no seu espaço de trabalho e especificar remote a True no @distributed decorador, a carga de trabalho será lançada no pool de GPU reservado por padrão. Se quiser usar o pool de GPU on-demand, por favor defina a variável DATABRICKS_USE_RESERVED_GPU_POOL ambiente para False antes de chamar a função distribuída, como mostrado abaixo:

import os
os.environ['DATABRICKS_USE_RESERVED_GPU_POOL'] = 'False'
@distributed(gpus=8, remote=True)
def run_train():
    ...

Mais informações

Para referência à API, consulte a documentação da API Python para GPU Serverless .