Udostępnij za pośrednictwem


Obciążenie z wieloma procesorami GPU

Ważna

Ta funkcja jest dostępna w wersji beta. Administratorzy obszaru roboczego mogą kontrolować dostęp do tej funkcji ze strony Podglądy . Zobacz Zarządzanie wersjami zapoznawczami usługi Azure Databricks.

Obciążenia rozproszone można uruchamiać na wielu procesorach GPU w jednym węźle przy użyciu bezserwerowego interfejsu API języka Python. Interfejs API udostępnia prosty, ujednolicony interfejs, który usuwa szczegóły dotyczące dostarczania GPU, konfiguracji środowiska i dystrybucji obciążeń. Dzięki minimalnym zmianom kodu można bezproblemowo przejść od trenowania z pojedynczym GPU do rozproszonego wykonania z wieloma GPU z tego samego notatnika.

Obsługiwane platformy

Interfejs @distributed API integruje się z głównymi rozproszonymi bibliotekami szkoleniowymi:

  • PyTorch Distributed Data Parallel (DDP) — standardowa równoległość danych z wieloma GPU.
  • Całkowicie równoległe przetwarzanie danych na fragmenty (FSDP) — pamięciooszczędne trenowanie dużych modeli.
  • DeepSpeed — biblioteka optymalizacji firmy Microsoft do trenowania dużych modeli.

interfejs API serverless_gpu vs. TorchDistributor

W poniższej serverless_gpu@distributed tabeli porównaliśmy interfejs API z usługą TorchDistributor:

Funkcja serverless_gpu @distributed API TorchDistributor
Infrastruktura W pełni bezserwerowe, bez zarządzania klastrem Wymaga klastra Spark z pracownikami GPU
Konfiguracja Pojedynczy dekorator, minimalna konfiguracja Wymaga konfiguracji klastra Spark i narzędzia TorchDistributor
Obsługa komponentu Framework PyTorch DDP, FSDP, DeepSpeed Przede wszystkim PyTorch DDP
Ładowanie danych Wewnątrz dekoratora używa woluminów katalogu Unity Za pośrednictwem platformy Spark lub systemu plików

API serverless_gpu jest zalecanym podejściem do nowych zestawów zadań dotyczących uczenia głębokiego na platformie Databricks. TorchDistributor pozostaje dostępny dla obciążeń ściśle powiązanych z klastrami Spark.

Szybki start

Bezserwerowy interfejs API procesora GPU do trenowania rozproszonego jest wstępnie instalowany po połączeniu z bezserwerowym procesorem GPU w notesach i zadaniach usługi Databricks. Zalecamy środowisko procesora GPU 4 i nowsze. Aby użyć go do trenowania rozproszonego, zaimportuj i użyj dekoratora distributed do dystrybucji funkcji trenowania.

Umieść kod trenowania modelu w funkcji i udekoruj funkcję dekoratorem @distributed. Funkcja ozdobiona staje się punktem wejścia do wykonywania rozproszonego — wszystkie logiki trenowania, ładowania danych i inicjowania modelu powinny być zdefiniowane wewnątrz tej funkcji.

Ostrzeżenie

Parametr gpu_type w @distributed musi być zgodny z typem akceleratora, z którym jest połączony notes. Na przykład @distributed(gpus=8, gpu_type='H100') wymaga, aby notes był połączony z akceleratorem H100. Użycie niedopasowanego typu akceleratora (na przykład nawiązywanie połączenia z A10 podczas określania H100) spowoduje niepowodzenie obciążenia.

Poniższy fragment kodu przedstawia podstawowe użycie elementu @distributed:

# Import the distributed decorator
from serverless_gpu import distributed

# Decorate your training function with @distributed and specify the number of GPUs and GPU type
@distributed(gpus=8, gpu_type='H100')
def run_train():
    ...

Poniżej znajduje się pełny przykład, który trenuje model wielowarstwowy perceptron (MLP) na ośmiu GPU H100 z notebooka:

  1. Skonfiguruj model i zdefiniuj funkcje narzędzi.

    
    # Define the model
    import os
    import torch
    import torch.distributed as dist
    import torch.nn as nn
    
    def setup():
        dist.init_process_group("nccl")
        torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
    
    def cleanup():
        dist.destroy_process_group()
    
    class SimpleMLP(nn.Module):
        def __init__(self, input_dim=10, hidden_dim=64, output_dim=1):
            super().__init__()
            self.net = nn.Sequential(
                nn.Linear(input_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(hidden_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(hidden_dim, output_dim)
            )
    
        def forward(self, x):
            return self.net(x)
    
  2. Zaimportuj bibliotekę serverless_gpu i moduł rozproszony .

    import serverless_gpu
    from serverless_gpu import distributed
    
  3. Umieść kod trenowania modelu w funkcji i udekoruj funkcję dekoratorem @distributed.

    @distributed(gpus=8, gpu_type='H100')
    def run_train(num_epochs: int, batch_size: int) -> None:
        import mlflow
        import torch.optim as optim
        from torch.nn.parallel import DistributedDataParallel as DDP
        from torch.utils.data import DataLoader, DistributedSampler, TensorDataset
    
        # 1. Set up multi-GPU environment
        setup()
        device = torch.device(f"cuda:{int(os.environ['LOCAL_RANK'])}")
    
        # 2. Apply the Torch distributed data parallel (DDP) library for data-parellel training.
        model = SimpleMLP().to(device)
        model = DDP(model, device_ids=[device])
    
        # 3. Create and load dataset.
        x = torch.randn(5000, 10)
        y = torch.randn(5000, 1)
    
        dataset = TensorDataset(x, y)
        sampler = DistributedSampler(dataset)
        dataloader = DataLoader(dataset, sampler=sampler, batch_size=batch_size)
    
        # 4. Define the training loop.
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        loss_fn = nn.MSELoss()
    
        for epoch in range(num_epochs):
            sampler.set_epoch(epoch)
            model.train()
            total_loss = 0.0
            for step, (xb, yb) in enumerate(dataloader):
                xb, yb = xb.to(device), yb.to(device)
                optimizer.zero_grad()
                loss = loss_fn(model(xb), yb)
                # Log loss to MLflow metric
                mlflow.log_metric("loss", loss.item(), step=step)
    
                loss.backward()
                optimizer.step()
                total_loss += loss.item() * xb.size(0)
    
            mlflow.log_metric("total_loss", total_loss)
            print(f"Total loss for epoch {epoch}: {total_loss}")
    
        cleanup()
    
  4. Wykonaj trenowanie rozproszone, wywołując funkcję rozproszoną z argumentami zdefiniowanymi przez użytkownika.

    run_train.distributed(num_epochs=3, batch_size=1)
    
  5. Po wykonaniu link do uruchomienia MLflow zostanie wygenerowany w danych wyjściowych komórki notatnika. Kliknij link przebiegu MLflow lub znajdź go w panelu Eksperyment, aby zobaczyć wyniki przebiegu.

Szczegóły wykonania rozproszonego

Bezserwerowy interfejs API procesora GPU składa się z kilku kluczowych składników:

  • Menedżer zasobów obliczeniowych: obsługuje alokację zasobów i zarządzanie nimi
  • Środowisko uruchomieniowe: zarządza środowiskami i zależnościami języka Python
  • Program uruchamiający: orkiestruje wykonywanie i monitorowanie zadań

W przypadku uruchamiania w trybie rozproszonym:

  • Funkcja jest serializowana i dystrybuowana w określonej liczbie procesorów GPU
  • Każdy procesor GPU uruchamia kopię funkcji z tymi samymi parametrami
  • Środowisko jest synchronizowane ze wszystkimi procesorami GPU
  • Wyniki są zbierane i zwracane ze wszystkich procesorów GPU

Interfejs API obsługuje popularne biblioteki trenowania równoległego, takie jak Distributed Data Parallel (DDP), Fully Sharded Data Parallel (FSDP), DeepSpeed.

Bardziej realistyczne scenariusze trenowania rozproszonego można znaleźć przy użyciu różnych bibliotek w przykładach z notesów.

Często zadawane pytania

Gdzie należy umieścić kod ładowania danych?

Podczas korzystania z Serverless GPU API do rozproszonego trenowania, umieść kod ładowania danych wewnątrz dekoratora @distributed. Rozmiar zestawu danych może przekraczać maksymalny rozmiar dozwolony przez pickle, dlatego zaleca się wygenerować zestaw danych wewnątrz dekoratora, co przedstawiono poniżej.

from serverless_gpu import distributed

# this may cause pickle error
dataset = get_dataset(file_path)
@distributed(gpus=8, gpu_type='H100')
def run_train():
  # good practice
  dataset = get_dataset(file_path)
  ....

Dowiedz się więcej

Aby zapoznać się z dokumentacją, sprawdź dokumentację bezserwerowego interfejsu API GPU w języku Python.