Distributed GPU training guide (SDK v2)

APPLIES TO: Python SDK azure-ai-ml v2 (current)

Learn more about how to use distributed GPU training code in Azure Machine Learning (ML). This article will not teach you about distributed training. It will help you run your existing distributed training code on Azure Machine Learning. It offers tips and examples for you to follow for each framework:

  • Message Passing Interface (MPI)
    • Horovod
    • Environment variables from Open MPI
  • PyTorch
  • TensorFlow
  • Accelerate GPU training with InfiniBand

Prerequisites

Review these basic concepts of distributed GPU training such as data parallelism, distributed data parallelism, and model parallelism.

Tip

If you don't know which type of parallelism to use, more than 90% of the time you should use Distributed Data Parallelism.

MPI

Azure ML offers an MPI job to launch a given number of processes in each node. Azure ML constructs the full MPI launch command (mpirun) behind the scenes. You can't provide your own full head-node-launcher commands like mpirun or DeepSpeed launcher.

Tip

The base Docker image used by an Azure Machine Learning MPI job needs to have an MPI library installed. Open MPI is included in all the AzureML GPU base images. When you use a custom Docker image, you are responsible for making sure the image includes an MPI library. Open MPI is recommended, but you can also use a different MPI implementation such as Intel MPI. Azure ML also provides curated environments for popular frameworks.

To run distributed training using MPI, follow these steps:

  1. Use an Azure ML environment with the preferred deep learning framework and MPI. AzureML provides curated environment for popular frameworks.
  2. Define a command with instance_count. instance_count should be equal to the number of GPUs per node for per-process-launch, or set to 1 (the default) for per-node-launch if the user script will be responsible for launching the processes per node.
  3. Use the distribution parameter of the command to specify settings for MpiDistribution.
from azure.ai.ml import command, MpiDistribution

job = command(
    code="./src",  # local path where the code is stored
    command="python train.py --epochs ${{inputs.epochs}}",
    inputs={"epochs": 1},
    environment="AzureML-tensorflow-2.7-ubuntu20.04-py38-cuda11-gpu@latest",
    compute="gpu-cluster",
    instance_count=2,
    distribution=MpiDistribution(process_count_per_instance=2),
    display_name="tensorflow-mnist-distributed-horovod-example"
    # experiment_name: tensorflow-mnist-distributed-horovod-example
    # description: Train a basic neural network with TensorFlow on the MNIST dataset, distributed via Horovod.
)

Horovod

Use the MPI job configuration when you use Horovod for distributed training with the deep learning framework.

Make sure your code follows these tips:

  • The training code is instrumented correctly with Horovod before adding the Azure ML parts
  • Your Azure ML environment contains Horovod and MPI. The PyTorch and TensorFlow curated GPU environments come pre-configured with Horovod and its dependencies.
  • Create a command with your desired distribution.

Horovod example

Environment variables from Open MPI

When running MPI jobs with Open MPI images, the following environment variables for each process launched:

  1. OMPI_COMM_WORLD_RANK - the rank of the process
  2. OMPI_COMM_WORLD_SIZE - the world size
  3. AZ_BATCH_MASTER_NODE - primary address with port, MASTER_ADDR:MASTER_PORT
  4. OMPI_COMM_WORLD_LOCAL_RANK - the local rank of the process on the node
  5. OMPI_COMM_WORLD_LOCAL_SIZE - number of processes on the node

Tip

Despite the name, environment variable OMPI_COMM_WORLD_NODE_RANK does not corresponds to the NODE_RANK. To use per-node-launcher, set process_count_per_node=1 and use OMPI_COMM_WORLD_RANK as the NODE_RANK.

PyTorch

Azure ML supports running distributed jobs using PyTorch's native distributed training capabilities (torch.distributed).

Tip

For data parallelism, the official PyTorch guidance is to use DistributedDataParallel (DDP) over DataParallel for both single-node and multi-node distributed training. PyTorch also recommends using DistributedDataParallel over the multiprocessing package. Azure Machine Learning documentation and examples will therefore focus on DistributedDataParallel training.

Process group initialization

The backbone of any distributed training is based on a group of processes that know each other and can communicate with each other using a backend. For PyTorch, the process group is created by calling torch.distributed.init_process_group in all distributed processes to collectively form a process group.

torch.distributed.init_process_group(backend='nccl', init_method='env://', ...)

The most common communication backends used are mpi, nccl, and gloo. For GPU-based training nccl is recommended for best performance and should be used whenever possible.

init_method tells how each process can discover each other, how they initialize and verify the process group using the communication backend. By default if init_method is not specified PyTorch will use the environment variable initialization method (env://). init_method is the recommended initialization method to use in your training code to run distributed PyTorch on Azure ML. PyTorch will look for the following environment variables for initialization:

  • MASTER_ADDR - IP address of the machine that will host the process with rank 0.
  • MASTER_PORT - A free port on the machine that will host the process with rank 0.
  • WORLD_SIZE - The total number of processes. Should be equal to the total number of devices (GPU) used for distributed training.
  • RANK - The (global) rank of the current process. The possible values are 0 to (world size - 1).

For more information on process group initialization, see the PyTorch documentation.

Beyond these, many applications will also need the following environment variables:

  • LOCAL_RANK - The local (relative) rank of the process within the node. The possible values are 0 to (# of processes on the node - 1). This information is useful because many operations such as data preparation only should be performed once per node --- usually on local_rank = 0.
  • NODE_RANK - The rank of the node for multi-node training. The possible values are 0 to (total # of nodes - 1).

You don't need to use a launcher utility like torch.distributed.launch. To run a distributed PyTorch job:

  1. Specify the training script and arguments
  2. Create a command and specify the type as PyTorch and the process_count_per_instance in the distribution parameter. The process_count_per_instance corresponds to the total number of processes you want to run for your job. process_count_per_instance should typically equal # GPUs per node x # nodes. If process_count_per_instance isn't specified, Azure ML will by default launch one process per node.

Azure ML will set the MASTER_ADDR, MASTER_PORT, WORLD_SIZE, and NODE_RANK environment variables on each node, and set the process-level RANK and LOCAL_RANK environment variables.

from azure.ai.ml import command
from azure.ai.ml.entities import Data
from azure.ai.ml import Input
from azure.ai.ml import Output
from azure.ai.ml.constants import AssetTypes

# === Note on path ===
# can be can be a local path or a cloud path. AzureML supports https://`, `abfss://`, `wasbs://` and `azureml://` URIs.
# Local paths are automatically uploaded to the default datastore in the cloud.
# More details on supported paths: https://docs.microsoft.com/azure/machine-learning/how-to-read-write-data-v2#supported-paths

inputs = {
    "cifar": Input(
        type=AssetTypes.URI_FOLDER, path=returned_job.outputs.cifar.path
    ),  # path="azureml:azureml_stoic_cartoon_wgb3lgvgky_output_data_cifar:1"), #path="azureml://datastores/workspaceblobstore/paths/azureml/stoic_cartoon_wgb3lgvgky/cifar/"),
    "epoch": 10,
    "batchsize": 64,
    "workers": 2,
    "lr": 0.01,
    "momen": 0.9,
    "prtfreq": 200,
    "output": "./outputs",
}

job = command(
    code="./src",  # local path where the code is stored
    command="python train.py --data-dir ${{inputs.cifar}} --epochs ${{inputs.epoch}} --batch-size ${{inputs.batchsize}} --workers ${{inputs.workers}} --learning-rate ${{inputs.lr}} --momentum ${{inputs.momen}} --print-freq ${{inputs.prtfreq}} --model-dir ${{inputs.output}}",
    inputs=inputs,
    environment="azureml:AzureML-pytorch-1.9-ubuntu18.04-py37-cuda11-gpu:6",
    compute="gpu-cluster",  # Change the name to the gpu cluster of your workspace.
    instance_count=2,  # In this, only 2 node cluster was created.
    distribution={
        "type": "PyTorch",
        # set process count to the number of gpus per node
        # NV6 has only 1 GPU
        "process_count_per_instance": 1,
    },
)

Pytorch example

DeepSpeed

DeepSpeed is supported as a first-class citizen within Azure Machine Learning to run distributed jobs with near linear scalabibility in terms of 

  • Increase in model size
  • Increase in number of GPUs

DeepSpeed can be enabled using either Pytorch distribution or MPI for running distributed training. Azure Machine Learning supports the DeepSpeed launcher to launch distributed training as well as autotuning to get optimal ds configuration.

You can use a curated environment for an out of the box environment with the latest state of art technologies including DeepSpeed, ORT, MSSCCL, and Pytorch for your DeepSpeed training jobs.

DeepSpeed example

  • For DeepSpeed training and autotuning examples, see these folders.

TensorFlow

If you're using native distributed TensorFlow in your training code, such as TensorFlow 2.x's tf.distribute.Strategy API, you can launch the distributed job via Azure ML using distribution parameters or the TensorFlowDistribution object.

# create the command
job = command(
    code="./src",  # local path where the code is stored
    command="python main.py --epochs ${{inputs.epochs}} --model-dir ${{inputs.model_dir}}",
    inputs={"epochs": 1, "model_dir": "outputs/keras-model"},
    environment="AzureML-tensorflow-2.4-ubuntu18.04-py37-cuda11-gpu@latest",
    compute="cpu-cluster",
    instance_count=2,
    # distribution = {"type": "mpi", "process_count_per_instance": 1},
    distribution={
        "type": "tensorflow",
        "parameter_server_count": 1,
        "worker_count": 2,
        "added_property": 7,
    },
    # distribution = {
    #        "type": "pytorch",
    #        "process_count_per_instance": 4,
    #        "additional_prop": {"nested_prop": 3},
    #    },
    display_name="tensorflow-mnist-distributed-example"
    # experiment_name: tensorflow-mnist-distributed-example
    # description: Train a basic neural network with TensorFlow on the MNIST dataset, distributed via TensorFlow.
)

# can also set the distribution in a separate step and using the typed objects instead of a dict
job.distribution = TensorFlowDistribution(parameter_server_count=1, worker_count=2)

If your training script uses the parameter server strategy for distributed training, such as for legacy TensorFlow 1.x, you'll also need to specify the number of parameter servers to use in the job, inside the distribution parameter of the command. In the above, for example, "parameter_server_count" : 1 and `"worker_count": 2,

TF_CONFIG

In TensorFlow, the TF_CONFIG environment variable is required for training on multiple machines. For TensorFlow jobs, Azure ML will configure and set the TF_CONFIG variable appropriately for each worker before executing your training script.

You can access TF_CONFIG from your training script if you need to: os.environ['TF_CONFIG'].

Example TF_CONFIG set on a chief worker node:

TF_CONFIG='{
    "cluster": {
        "worker": ["host0:2222", "host1:2222"]
    },
    "task": {"type": "worker", "index": 0},
    "environment": "cloud"
}'

TensorFlow example

Accelerating distributed GPU training with InfiniBand

As the number of VMs training a model increases, the time required to train that model should decrease. The decrease in time, ideally, should be linearly proportional to the number of training VMs. For instance, if training a model on one VM takes 100 seconds, then training the same model on two VMs should ideally take 50 seconds. Training the model on four VMs should take 25 seconds, and so on.

InfiniBand can be an important factor in attaining this linear scaling. InfiniBand enables low-latency, GPU-to-GPU communication across nodes in a cluster. InfiniBand requires specialized hardware to operate. Certain Azure VM series, specifically the NC, ND, and H-series, now have RDMA-capable VMs with SR-IOV and InfiniBand support. These VMs communicate over the low latency and high-bandwidth InfiniBand network, which is much more performant than Ethernet-based connectivity. SR-IOV for InfiniBand enables near bare-metal performance for any MPI library (MPI is used by many distributed training frameworks and tooling, including NVIDIA's NCCL software.) These SKUs are intended to meet the needs of computationally intensive, GPU-acclerated machine learning workloads. For more information, see Accelerating Distributed Training in Azure Machine Learning with SR-IOV.

Typically, VM SKUs with an 'r' in their name contain the required InfiniBand hardware, and those without an 'r' typically do not. ('r' is a reference to RDMA, which stands for "remote direct memory access.") For instance, the VM SKU Standard_NC24rs_v3 is InfiniBand-enabled, but the SKU Standard_NC24s_v3 is not. Aside from the InfiniBand capabilities, the specs between these two SKUs are largely the same – both have 24 cores, 448 GB RAM, 4 GPUs of the same SKU, etc. Learn more about RDMA- and InfiniBand-enabled machine SKUs.

Warning

The older-generation machine SKU Standard_NC24r is RDMA-enabled, but it does not contain SR-IOV hardware required for InfiniBand.

If you create an AmlCompute cluster of one of these RDMA-capable, InfiniBand-enabled sizes, the OS image will come with the Mellanox OFED driver required to enable InfiniBand preinstalled and preconfigured.

Next steps