Distribuer l’entraînement PyTorch avec Horovod

Effectué

PyTorch, en commun avec d’autres infrastructures de Deep Learning comme TensorFlow, est conçu pour être mis à l’échelle sur plusieurs processeurs (unités centrales ou processeurs graphiques) sur un seul ordinateur. Dans la plupart des cas, cette approche du scale-up à l’aide d’ordinateurs dotés de processeurs plus nombreux ou plus rapides offre des performances d’entraînement adéquates. Toutefois, lorsque vous devez travailler avec des réseaux neuronaux extrêmement complexes ou de grands volumes de données d’entraînement, vous pouvez tirer parti de la capacité inhérente d’Apache Spark à effectuer un scale-out des tâches de traitement sur plusieurs nœuds worker.

Azure Databricks utilise des clusters Spark qui peuvent inclure plusieurs nœuds worker. En outre, les runtimes Databricks de Machine Learning dans Azure Databricks incluent Horovod et une bibliothèque open source qui vous permet de distribuer des tâches d’entraînement Machine Learning sur les nœuds d’un cluster.

Créer une fonction d’entraînement

La première étape d’utilisation de Horovod consiste à créer une fonction qui encapsule le code que vous utilisez pour appeler votre fonction d’entraînement. Le code de cette fonction « externe » peut tirer parti de différentes classes et fonctions Horovod pour distribuer des données et l’état de l’optimiseur sur tous les nœuds impliqués dans le processus d’entraînement.

L’exemple de code suivant montre une fonction permettant de gérer l’exécution d’un entraînement distribué.

import horovod.torch as hvd
from sparkdl import HorovodRunner

def train_hvd(model):
    from torch.utils.data.distributed import DistributedSampler
    
    hvd.init()
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    if device.type == 'cuda':
        # Pin GPU to local rank
        torch.cuda.set_device(hvd.local_rank())
    
    # Configure the sampler so that each worker gets a distinct sample of the input dataset
    train_sampler = DistributedSampler(train_ds, num_replicas=hvd.size(), rank=hvd.rank())
    # Use train_sampler to load a different sample of data on each worker
    train_loader = torch.utils.data.DataLoader(train_ds, batch_size=20, sampler=train_sampler)
    
    loss_criteria = nn.CrossEntropyLoss()
    # Scale the learning_rate based on the number of nodes
    learning_rate = 0.001 * hvd.size()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    
    # Wrap the local optimizer with hvd.DistributedOptimizer so that Horovod handles the distributed optimization
    optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())

    # Broadcast initial parameters so all workers work with the same parameters
    hvd.broadcast_parameters(model.state_dict(), root_rank=0)
    hvd.broadcast_optimizer_state(optimizer, root_rank=0)

    # Train over 50 epochs
    epochs = 50
    for epoch in range(1, epochs + 1):
        print('Epoch: {}'.format(epoch))
        # Feed training data into the model to optimize the weights
        train_loss = train(model, train_loader, optimizer)

    # Save the model weights
    if hvd.rank() == 0:
        model_file = '/dbfs/myModel_hvd.pkl'
        torch.save(model.state_dict(), model_file)
        print('model saved as', model_file)

Exécuter la fonction sur plusieurs nœuds

Une fois que vous avez défini la fonction pour appeler votre fonction d’entraînement, vous pouvez utiliser la classe HorovodRunner pour l’exécuter sur plusieurs nœuds de votre cluster, comme illustré ici :

# Create a new model
hvd_model = MyNet()

# Run the distributed training function on 2 nodes
hr = HorovodRunner(np=2) 
hr.run(train_hvd, model=hvd_model)