
import horovod
Out[1]: '0.23.0'

Distributed deep learning training using TensorFlow and Keras with HorovodRunner

This notebook illustrates the use of HorovodRunner for distributed training with the tensorflow.keras API. It first shows how to train a model on a single node, and then shows how to adapt the code using HorovodRunner for distributed training. The notebook runs on CPU and GPU clusters.


Databricks Runtime 7.0 ML or above.
HorovodRunner is designed to improve model training performance on clusters with multiple workers, but multiple workers are not required to run this notebook.

Create function to prepare data

The get_dataset() function prepares the data for training. This function takes in rank and size arguments so it can be used for both single-node and distributed training. In Horovod, rank is a unique process ID and size is the total number of processes.

This function downloads the data from keras.datasets, distributes the data across the available nodes, and converts the data to shapes and types needed for training.

def get_dataset(num_classes, rank=0, size=1):
  from tensorflow import keras
  (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data('MNIST-data-%d' % rank)
  x_train = x_train[rank::size]
  y_train = y_train[rank::size]
  x_test = x_test[rank::size]
  y_test = y_test[rank::size]
  x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
  x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
  x_train = x_train.astype('float32')
  x_test = x_test.astype('float32')
  x_train /= 255
  x_test /= 255
  y_train = keras.utils.to_categorical(y_train, num_classes)
  y_test = keras.utils.to_categorical(y_test, num_classes)
  return (x_train, y_train), (x_test, y_test)

Create function to train model

The get_model() function defines the model using the tensorflow.keras API. This code is adapted from the Keras MNIST convnet example.

def get_model(num_classes):
  from tensorflow.keras import models
  from tensorflow.keras import layers
  model = models.Sequential()
  model.add(layers.Conv2D(32, kernel_size=(3, 3),
                   input_shape=(28, 28, 1)))
  model.add(layers.Conv2D(64, (3, 3), activation='relu'))
  model.add(layers.MaxPooling2D(pool_size=(2, 2)))
  model.add(layers.Dense(128, activation='relu'))
  model.add(layers.Dense(num_classes, activation='softmax'))
  return model

Run training on single node

The train() function in the following cell defines single-node training code with tensorflow.keras.

# Specify training parameters
batch_size = 128
epochs = 1
num_classes = 10
def train(learning_rate=1.0):
  from tensorflow import keras
  (x_train, y_train), (x_test, y_test) = get_dataset(num_classes)
  model = get_model(num_classes)
  # Specify the optimizer (Adadelta in this example), using the learning rate input parameter of the function so that Horovod can adjust the learning rate during training
  optimizer = keras.optimizers.Adadelta(lr=learning_rate)
  model.fit(x_train, y_train,
            validation_data=(x_test, y_test))
  return model

Run the train() function to train a model on the driver node. The process takes several minutes. The accuracy improves with each epoch.

model = train(learning_rate=0.1)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
469/469 - 11s - loss: 0.6214 - accuracy: 0.8098 - val_loss: 0.2215 - val_accuracy: 0.9351 - 11s/epoch - 24ms/step

Calculate and print the loss and accuracy of the model.

_, (x_test, y_test) = get_dataset(num_classes)
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)
79/79 [==============================] - 0s 3ms/step - loss: 0.2215 - accuracy: 0.9351
loss: 0.22154481709003448
accuracy: 0.9351000189781189

Migrate to HorovodRunner for distributed training

This section shows how to modify the single-node code to use Horovod. For more information about Horovod, see the Horovod documentation.

First, create a directory to save model checkpoints.

import os
import time
# Remove any existing checkpoint files
dbutils.fs.rm(("/ml/MNISTDemo/train"), recurse=True)
# Create directory
checkpoint_dir = '/dbfs/ml/MNISTDemo/train/{}/'.format(time.time())
def train_hvd(checkpoint_path=None, learning_rate=1.0, timeline=None):
  # Import tensorflow modules to each worker
    from tensorflow.keras import backend as K
    from tensorflow.keras.models import Sequential
    import tensorflow as tf
    import os
    from tensorflow import keras
    import horovod.tensorflow.keras as hvd
    worker_timeline = None
    dbfs_timeline = None
    if timeline and timeline.startswith("/dbfs"):
        dbfs_timeline = timeline
        worker_timeline = "/tmp"+timeline[5:]
        timeline = worker_timeline
        os.makedirs(os.path.dirname(worker_timeline), exist_ok=True)
    if os.environ.get("HOROVOD_TIMELINE", "").startswith("/dbfs"):
        dbfs_timeline = os.environ["HOROVOD_TIMELINE"]
        worker_timeline = "/tmp"+os.environ["HOROVOD_TIMELINE"][5:]
        os.environ["HOROVOD_TIMELINE"] = worker_timeline
        os.makedirs(os.path.dirname(worker_timeline), exist_ok=True)
    print(f"dbfs_timeline:{dbfs_timeline} worker_timeline:{worker_timeline}")
    # Initialize Horovod
    if timeline:
        # Pin GPU to be used to process local rank (one GPU per process)
        # These steps are skipped on a CPU cluster
        gpus = tf.config.experimental.list_physical_devices('GPU')
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        if gpus:
            tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
        # Call the get_dataset function you created, this time with the Horovod rank and size
        (x_train, y_train), (x_test, y_test) = get_dataset(num_classes, hvd.rank(), hvd.size())
        model = get_model(num_classes)
        # Adjust learning rate based on number of GPUs
        optimizer = keras.optimizers.Adadelta(lr=learning_rate * hvd.size())
        # Use the Horovod Distributed Optimizer
        optimizer = hvd.DistributedOptimizer(optimizer)
        # Create a callback to broadcast the initial variable states from rank 0 to all other processes.
        # This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
        callbacks = [
        # Save checkpoints only on worker 0 to prevent conflicts between workers
        if hvd.rank() == 0 and checkpoint_path:
            callbacks.append(keras.callbacks.ModelCheckpoint(checkpoint_path, save_weights_only = True))
        model.fit(x_train, y_train,
                validation_data=(x_test, y_test))
        if timeline:
        if dbfs_timeline and worker_timeline:
            if os.path.exists(worker_timeline):
                import shutil
                print(f"Copying: from: {worker_timeline} to {dbfs_timeline}")
                shutil.copy(worker_timeline, dbfs_timeline)
import time
import os
timeline_file = f'/dbfs/ml/hvd_timeline_{time.time()}.json'
os.makedirs(os.path.dirname(timeline_file), exist_ok=True)
from sparkdl import HorovodRunner
os.environ['HOROVOD_TIMELINE'] = timeline_file
hr = HorovodRunner(np=2, driver_log_verbosity='all')
hr.run(train_hvd, learning_rate=0.1)
View the timeline file to see horovod training timeline data

%sh cat /dbfs/ml/hvd_timeline_1643218354.8848794.json
