Tutorial: Entrenamiento distribuido con Horovod Runner y TensorFlow (versión preliminar)

Horovod es un marco de entrenamiento distribuido para librerías como TensorFlow y PyTorch. Con Horovod, los usuarios pueden escalar verticalmente un script de entrenamiento existente para ejecutarse en cientos de GPU en unas pocas líneas de código.

En Azure Synapse Analytics, los usuarios pueden empezar a trabajar rápidamente con Horovod mediante el entorno de ejecución predeterminado de Apache Spark 3. En el caso de las aplicaciones de canalización de Spark ML que usan TensorFlow, los usuarios pueden emplear HorovodRunner. Este cuaderno usa un dataframe de Apache Spark para realizar el entrenamiento distribuido de un modelo de red neuronal distribuida (DNN) en el conjunto de datos de MNIST. En este tutorial se usa TensorFlow y HorovodRunner para ejecutar el proceso de entrenamiento.

Requisitos previos

  • Necesitará un área de trabajo de Azure Synapse Analytics con una cuenta de almacenamiento de Azure Data Lake Storage Gen2 que esté configurada como almacenamiento predeterminado. Asegúrese de que es el colaborador de datos de Storage Blob en el sistema de archivos de Data Lake Storage Gen2 con el que trabaja.
  • Cree un grupo de Apache Spark habilitado para GPU en el área de trabajo de Azure Synapse Analytics. Para más información, consulte Creación de un grupo de Apache Spark habilitado para GPU en Azure Synapse. En este tutorial, se recomienda usar el tamaño del clúster de GPU grande con 3 nodos.

Advertencia

  • La versión preliminar acelerada por GPU se limita a los entornos de ejecución de Azure Synapse 3.1 (no admitido) y Apache Spark 3.2 (Finalización del soporte técnico anunciada).
  • El 26 de enero de 2023 el entorno de ejecución de Azure Synapse para Apache Spark 3.1 ha alcanzado su fin de soporte técnico, por lo que a partir del 26 de enero de 2024 se interrumpirá el soporte técnico oficial y no se atenderán más incidencias de soporte técnico, correcciones de errores ni actualizaciones de seguridad después de esta fecha.
  • Final del soporte técnico anunciado para Azure Synapse Runtime para Apache Spark 3.2 se ha anunciado el 8 de julio de 2023. El final del soporte técnico anunciado en tiempo de ejecución no tendrá correcciones de errores y características. Las correcciones de seguridad se realizarán en función de la evaluación de riesgos. Este tiempo de ejecución se retirará y deshabilitará a partir del 8 de julio de 2024.

Configuración de la sesión de Apache Spark

Al principio de la sesión, es necesario configurar algunas opciones de Apache Spark. En la mayoría de los casos, solo es necesario establecer numExecutors y spark.rapids.memory.gpu.reserve. En el caso de los modelos muy grandes, es posible que los usuarios también necesiten configurar el valor spark.kryoserializer.buffer.max. En el caso de los modelos de TensorFlow, los usuarios deberán establecer spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH en true.

En el ejemplo, puede ver cómo se pueden pasar las configuraciones de Spark con el comando %%configure. El significado detallado de cada parámetro se explica en la documentación sobre la configuración de Apache Spark. Los valores que se proporcionan son los valores recomendados para grupos de GPU grande de Azure Synapse.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

En este tutorial, usaremos las siguientes configuraciones:


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
   }
}

Nota:

Al entrenar con Horovod, los usuarios deben establecer la configuración de Spark para que numExecutors sea menor o igual que el número de nodos.

Configuración de cuenta de almacenamiento principal

Necesitaremos la cuenta de Azure Data Lake Storage (ADLS) para almacenar datos intermedios y de modelos. Si usa una cuenta de almacenamiento alternativa, asegúrese de configurar el servicio vinculado para autenticarse y leer automáticamente desde la cuenta.

En este ejemplo, leeremos datos de la cuenta de almacenamiento principal de Azure Synapse Analytics. Para leer los resultados, debe modificar las siguientes propiedades: remote_url.

# Specify training parameters
num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1  # learning rate for single node code

# configure adls store remote url
remote_url = "<<abfss path to storage account>>

Preparación del conjunto de datos

A continuación, prepararemos el conjunto de datos para el entrenamiento. En este tutorial, usaremos el conjunto de datos de MNIST de Azure Open Datasets.

def get_dataset(rank=0, size=1):
    # import dependency libs
    from azureml.opendatasets import MNIST
    from sklearn.preprocessing import OneHotEncoder
    import numpy as np

    # Download MNIST dataset from Azure Open Datasets
    mnist = MNIST.get_tabular_dataset()
    mnist_df = mnist.to_pandas_dataframe()

    # Preprocess dataset
    mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
    mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)

    x = np.array(mnist_df['features'].values.tolist())
    y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)

    enc = OneHotEncoder()
    enc.fit(y)
    y = enc.transform(y).toarray()

    (x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
                                                                    y[60000:])

    # Prepare dataset for distributed training
    x_train = x_train[rank::size]
    y_train = y_train[rank::size]
    x_test = x_test[rank::size]
    y_test = y_test[rank::size]

    # Reshape and Normalize data for model input
    x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
    x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255.0
    x_test /= 255.0

    return (x_train, y_train), (x_test, y_test)

Definición de los modelos DNN

Una vez procesado el conjunto de datos, podemos definir nuestro modelo de TensorFlow. El mismo código también se puede usar para entrenar un modelo TensorFlow de un solo nodo.

# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
    from tensorflow.keras import models
    from tensorflow.keras import layers

    model = models.Sequential()
    model.add(
        layers.Conv2D(32,
                      kernel_size=(3, 3),
                      activation='relu',
                      input_shape=(28, 28, 1)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(layers.Dropout(0.25))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(10, activation='softmax'))
    return model

Definición de una función de entrenamiento para un único nodo

En primer lugar, entrenaremos nuestro modelo de TensorFlow en el nodo de controlador del grupo de Apache Spark. Una vez completado el proceso de entrenamiento, evaluaremos el modelo e imprimiremos las puntuaciones de pérdida y precisión.


def train(learning_rate=0.1):
    import tensorflow as tf
    from tensorflow import keras

    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

    # Prepare dataset
    (x_train, y_train), (x_test, y_test) = get_dataset()

    # Initialize model
    model = get_model()

    # Specify the optimizer (Adadelta in this example)
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))
    return model

# Run the training process on the driver
model = train(learning_rate=lr_single_node)

# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)

Migración a HorovodRunner para el entrenamiento distribuido

A continuación, veremos cómo se podría volver a ejecutar el mismo código con HorovodRunner para el entrenamiento distribuido.

Definición de la función de entrenamiento

Para entrenar un modelo, primero definimos una función de entrenamiento para HorovodRunner.

# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
    # Import base libs
    import tempfile
    import os
    import shutil
    import atexit

    # Import tensorflow modules to each worker
    import tensorflow as tf
    from tensorflow import keras
    import horovod.tensorflow.keras as hvd

    # Initialize Horovod
    hvd.init()

    # Pin GPU to be used to process local rank (one GPU per process)
    # These steps are skipped on a CPU cluster
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
                                                   'GPU')

    # Call the get_dataset function you created, this time with the Horovod rank and size
    (x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())

    # Initialize model with random weights
    model = get_model()

    # Adjust learning rate based on number of GPUs
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
                                          hvd.size())

    # Use the Horovod Distributed Optimizer
    optimizer = hvd.DistributedOptimizer(optimizer)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    # Create a callback to broadcast the initial variable states from rank 0 to all other processes.
    # This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
    callbacks = [
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    ]

    # Model checkpoint location.
    ckpt_dir = tempfile.mkdtemp()
    ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
    atexit.register(lambda: shutil.rmtree(ckpt_dir))

    # Save checkpoints only on worker 0 to prevent conflicts between workers
    if hvd.rank() == 0:
        callbacks.append(
            keras.callbacks.ModelCheckpoint(ckpt_file,
                                            monitor='val_loss',
                                            mode='min',
                                            save_best_only=True))

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              callbacks=callbacks,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))

    # Return model bytes only on worker 0
    if hvd.rank() == 0:
        with open(ckpt_file, 'rb') as f:
            return f.read()

Ejecución del entrenamiento

Una vez definido el modelo, podemos ejecutar el proceso de entrenamiento.

# Run training
import os
import sys
import horovod.spark


best_model_bytes = \
    horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
                    env=os.environ.copy(),
                    stdout=sys.stdout, stderr=sys.stderr, verbose=2,
                    prefix_output_with_timestamp=True)[0]

Guardar puntos de control en el almacenamiento de ADLS

El código siguiente muestra cómo guardar los puntos de comprobación en la cuenta de Azure Data Lake Storage (ADLS).

import tempfile
import fsspec
import os

local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file

with open(local_ckpt_file, 'wb') as f:
    f.write(best_model_bytes)

## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)

print(adls_ckpt_file)

Evaluación del modelo entrenado de Horovod

Una vez completado el entrenamiento del modelo, podemos echar un vistazo a la pérdida y precisión del modelo final.

import tensorflow as tf

hvd_model = tf.keras.models.load_model(local_ckpt_file)

_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)

Limpieza de recursos

Para asegurarse de que se cierra la instancia de Spark, finalice todas las sesiones (cuadernos) conectadas. El grupo se cierra cuando se alcanza el tiempo de inactividad especificado en el grupo de Apache Spark. También puede decidir finalizar la sesión en la barra de estado en la parte superior derecha del cuaderno.

Screenshot showing the Stop session button on the status bar.

Pasos siguientes