Dela via


Multi-GPU och arbetsbelastning med flera noder

Du kan starta distribuerad arbetsbelastning över flera GPU:er – antingen inom en enskild nod eller över flera noder – med hjälp av Det serverlösa GPU Python-API:et. API:et tillhandahåller ett enkelt, enhetligt gränssnitt som sammanfattar informationen om GPU-etablering, miljökonfiguration och arbetsbelastningsdistribution. Med minimala kodändringar kan du sömlöst övergå från utbildning med en enda GPU till distribuerad körning över fjärranslutna GPU:er från samma notebook.

Snabbstart

Det serverlösa GPU-API:et för distribuerad träning är förinstallerat i serverlösa GPU-beräkningsmiljöer för Databricks notebooks. Vi rekommenderar GPU-miljö 4 och senare. Om du vill använda den för distribuerad träning ska du importera och använda dekoratören distributed för att distribuera träningsfunktionen.

Kodfragmentet nedan visar den grundläggande användningen av @distributed:

# Import the distributed decorator
from serverless_gpu import distributed

# Decorate your training function with @distributed and specify the number of GPUs, the GPU type,
# and whether or not the GPUs are remote
@distributed(gpus=8, gpu_type='A10', remote=True)
def run_train():
    ...

Nedan följer ett fullständigt exempel som tränar en multilayer perceptron (MLP) modell på 8 A10 GPU-noder från en anteckningsbok.

  1. Konfigurera din modell och definiera verktygsfunktioner.

    
    # Define the model
    import os
    import torch
    import torch.distributed as dist
    import torch.nn as nn
    
    def setup():
        dist.init_process_group("nccl")
        torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
    
    def cleanup():
        dist.destroy_process_group()
    
    class SimpleMLP(nn.Module):
        def __init__(self, input_dim=10, hidden_dim=64, output_dim=1):
            super().__init__()
            self.net = nn.Sequential(
                nn.Linear(input_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(hidden_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(hidden_dim, output_dim)
            )
    
        def forward(self, x):
            return self.net(x)
    
  2. Importera serverless_gpu-biblioteket och den distribuerade modulen.

    import serverless_gpu
    from serverless_gpu import distributed
    
  3. Lägg in modellträningskoden i en funktion och använd dekoratören @distributed på funktionen.

    @distributed(gpus=8, gpu_type='A10', remote=True)
    def run_train(num_epochs: int, batch_size: int) -> None:
        import mlflow
        import torch.optim as optim
        from torch.nn.parallel import DistributedDataParallel as DDP
        from torch.utils.data import DataLoader, DistributedSampler, TensorDataset
    
        # 1. Set up multi node environment
        setup()
        device = torch.device(f"cuda:{int(os.environ['LOCAL_RANK'])}")
    
        # 2. Apply the Torch distributed data parallel (DDP) library for data-parellel training.
        model = SimpleMLP().to(device)
        model = DDP(model, device_ids=[device])
    
        # 3. Create and load dataset.
        x = torch.randn(5000, 10)
        y = torch.randn(5000, 1)
    
        dataset = TensorDataset(x, y)
        sampler = DistributedSampler(dataset)
        dataloader = DataLoader(dataset, sampler=sampler, batch_size=batch_size)
    
        # 4. Define the training loop.
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        loss_fn = nn.MSELoss()
    
        for epoch in range(num_epochs):
            sampler.set_epoch(epoch)
            model.train()
            total_loss = 0.0
            for step, (xb, yb) in enumerate(dataloader):
                xb, yb = xb.to(device), yb.to(device)
                optimizer.zero_grad()
                loss = loss_fn(model(xb), yb)
                # Log loss to MLflow metric
                mlflow.log_metric("loss", loss.item(), step=step)
    
                loss.backward()
                optimizer.step()
                total_loss += loss.item() * xb.size(0)
    
            mlflow.log_metric("total_loss", total_loss)
            print(f"Total loss for epoch {epoch}: {total_loss}")
    
        cleanup()
    
  4. Kör den distribuerade träningen genom att anropa den distribuerade funktionen med användardefinierade argument.

    run_train.distributed(num_epochs=3, batch_size=1)
    
  5. När en MLflow-körning utförs genereras en länk i notebookcellens utdata. Klicka på MLflow-körningslänken eller hitta den i panelen Experiment för att se körningsresultatet.

    Utdata i anteckningsbokens cell

Detaljer om distribuerad körning

Serverlöst GPU-API består av flera viktiga komponenter:

  • Compute Manager: Hanterar resursallokering och hantering
  • Körningsmiljö: Hanterar Python-miljöer och beroenden
  • Launcher: Samordnar uppdragskörning och övervakning

När du kör i distribuerat läge:

  • Funktionen serialiseras och distribueras över det angivna antalet GPU:er
  • Varje GPU kör en kopia av funktionen med samma parametrar
  • Miljön synkroniseras mellan alla noder
  • Resultat samlas in och returneras från alla GPU:er

Om remote är inställt Truepå distribueras arbetsbelastningen på de fjärranslutna GPU:erna. Om remote är inställt på False körs arbetsbelastningen på den enda GPU-noden som är ansluten till i den aktuella notebook. Om noden har flera GPU-chips används alla.

API:et stöder populära parallella träningsbibliotek som DDP (Distributed Data Parallel ), Fully Sharded Data Parallel (FSDP), DeepSpeed och Ray.

Du hittar fler verkliga distribuerade träningsscenarier med hjälp av de olika biblioteken i notebook-exempel.

Starta med Ray

Det serverlösa GPU-API:et har också stöd för att starta distribuerad träning med Ray, med hjälp av @ray_launch-dekoratören, som är skiktad ovanpå @distributed. Varje ray_launch uppgift startar först en torch-distribuerad rendezvous för att bestämma Ray-headarbetaren och samla IP-adresser. Rank-zero startar ray start --head (med måttexport om aktiverat), anger RAY_ADDRESSoch kör din dekorerade funktion som Ray-drivrutin. Andra noder ansluts via ray start --address och väntar tills drivrutinen skriver en slutförandemarkör.

Ytterligare konfigurationsinformation:

  • Om du vill aktivera Ray-systemmåttsamling på varje nod använder du RayMetricsMonitor med remote=True.
  • Definiera alternativ för Ray-körning (aktörer, datamängder, placeringsgrupper och schemaläggning) i din dekorerade funktion med hjälp av standard Ray-API:er.
  • Hantera klusteromfattande kontroller (GPU-antal och typ, fjärrläge jämfört med lokalt läge, asynkront beteende och Databricks-poolmiljövariabler) utanför funktionen i dekoratorargumenten eller notebook-miljön.

Exemplet nedan visar hur du använder @ray_launch:

from serverless_gpu.ray import ray_launch
@ray_launch(gpus=16, remote=True, gpu_type='A10')
def foo():
    import os
    import ray
    print(ray.state.available_resources_per_node())
    return 1
foo.distributed()

Ett fullständigt exempel finns i den här notebook-filen, som startar Ray för att träna ett neuralt Resnet18-nätverk på flera A10 GPU:er.

FAQs

Var ska datainläsningskoden placeras?

När du använder det serverlösa GPU-API: et för distribuerad träning flyttar du datainläsningskoden i @distributed dekoratören. Datamängdens storlek kan överskrida den maximala storlek som tillåts av pickle, så vi rekommenderar att du genererar datamängden inuti dekoratören enligt nedan:

from serverless_gpu import distributed

# this may cause pickle error
dataset = get_dataset(file_path)
@distributed(gpus=8, remote=True)
def run_train():
  # good practice
  dataset = get_dataset(file_path)
  ....

Kan jag använda reserverade GPU-pooler?

Om reserverad GPU-pool är tillgänglig (kontakta administratören) på arbetsytan och du anger remote till True i dekoratören @distributed , startas arbetsbelastningen på den reserverade GPU-poolen som standard. Om du vill använda GPU-poolen på begäran anger du miljövariabeln DATABRICKS_USE_RESERVED_GPU_POOL till False innan du anropar den distribuerade funktionen enligt nedan:

import os
os.environ['DATABRICKS_USE_RESERVED_GPU_POOL'] = 'False'
@distributed(gpus=8, remote=True)
def run_train():
    ...

Lära sig mer

Api-referensen finns i dokumentationen för Serverlös GPU Python API .