Partager via


Tutoriel : Formation distribuée avec Horovod Runner et TensorFlow (obsolète)

Horovod est une infrastructure d’entraînement distribué pour les bibliothèques TensorFlow et PyTorch. Avec Horovod, les utilisateurs peuvent mettre à l’échelle un script d’entraînement existant pour qu’il s’exécute sur des centaines de GPU en quelques lignes de code.

Dans Azure Synapse Analytics, les utilisateurs peuvent rapidement commencer à utiliser Horovod à l’aide du runtime Apache Spark 3 par défaut. Pour les applications de pipeline Spark ML qui utilisent TensorFlow, les utilisateurs peuvent utiliser HorovodRunner. Ce notebook utilise une trame de données Apache Spark pour effectuer l’entraînement distribué d’un modèle de réseau neuronal distribué (DNN) sur le jeu de données MNIST. Ce tutoriel utilise TensorFlow et HorovodRunner pour exécuter le processus de formation.

Prérequis

  • Espace de travail Azure Synapse Analytics avec un compte de stockage Azure Data Lake Storage Gen2 configuré comme stockage par défaut. Vous devez être le contributeur aux données Blob de stockage du système de fichiers Data Lake Storage Gen2 que vous utilisez.
  • Créez un pool Apache Spark avec processeur graphique (GPU) dans votre espace de travail Azure Synapse Analytics. Pour plus d’informations, consultez Créer un pool Apache Spark avec GPU dans Azure Synapse. Pour ce didacticiel, nous vous suggérons d’utiliser la taille du cluster GPU-Large avec 3 nœuds.

Remarque

La préversion pour les pools Azure Synapse avec GPU est désormais déconseillée.

Attention

Notification de dépréciation et de désactivation des GPU sur le Runtime Azure Synapse pour Apache Spark 3.1 et 3.2

  • La préversion accélérée du GPU est désormais déconseillée sur le runtime Apache Spark 3.2 (déconseillé). Les runtimes déconseillés ne vont pas bénéficier de corrections de bogues et de fonctionnalités. Ce runtime, ainsi que la préversion correspondante accélérée par le GPU sur Spark 3.2 ont été retirés et désactivés depuis le 8 juillet 2024.
  • La préversion accélérée par le GPU est désormais déconseillée sur runtime Azure Synapse 3.1 (déconseillé). Azure Synapse Runtime pour Apache Spark 3.1 a atteint sa fin de support le 26 janvier 2023, le support officiel étant interrompu à compter du 26 janvier 2024 et aucune autre réponse aux tickets de support, aux corrections de bogues ou aux mises à jour de sécurité au-delà de cette date.

Configurer la session Apache Spark

Au début de la session, nous devons configurer quelques paramètres Apache Spark. Dans la plupart des cas, nous devons uniquement définir les numExecutors et spark.rapids.memory.gpu.reserve. Pour les modèles très volumineux, les utilisateurs peuvent également avoir besoin de configurer le paramètre spark.kryoserializer.buffer.max. Pour les modèles TensorFlow, les utilisateurs doivent définir spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH sur true.

Dans cet exemple, vous pouvez voir comment les configurations Spark peuvent être passées avec la commande %%configure. La signification détaillée de chaque paramètre est expliquée dans la documentation de configuration Apache Spark. Les valeurs fournies sont les valeurs suggérées et recommandées pour les pools volumineux Azure Synapse avec GPU.


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

Pour ce tutoriel, nous utilisons les configurations suivantes :


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
   }
}

Remarque

Lors de l’entraînement avec Horovod, les utilisateurs doivent définir la configuration Spark pour numExecutors afin qu’elle soit inférieure ou égale au nombre de nœuds.

Configurer le compte de stockage principal

Nous avons besoin du compte Azure Data Lake Storage (ADLS) pour stocker les données intermédiaires et de modèle. Si vous utilisez un autre compte de stockage, veillez à configurer le service lié pour authentifier et lire automatiquement à partir du compte.

Dans cet exemple, nous lisons les données à partir du compte de stockage principal Azure Synapse Analytics. Pour lire les résultats, vous devez modifier les propriétés suivantes : remote_url.

# Specify training parameters
num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1  # learning rate for single node code

# configure adls store remote url
remote_url = "<<abfss path to storage account>>

Préparer le jeu de données

Ensuite, nous préparons le jeu de données pour la formation. Dans ce tutoriel, nous utilisons le jeu de données MNIST à partir d’Azure Open Datasets.

def get_dataset(rank=0, size=1):
    # import dependency libs
    from azureml.opendatasets import MNIST
    from sklearn.preprocessing import OneHotEncoder
    import numpy as np

    # Download MNIST dataset from Azure Open Datasets
    mnist = MNIST.get_tabular_dataset()
    mnist_df = mnist.to_pandas_dataframe()

    # Preprocess dataset
    mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
    mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)

    x = np.array(mnist_df['features'].values.tolist())
    y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)

    enc = OneHotEncoder()
    enc.fit(y)
    y = enc.transform(y).toarray()

    (x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
                                                                    y[60000:])

    # Prepare dataset for distributed training
    x_train = x_train[rank::size]
    y_train = y_train[rank::size]
    x_test = x_test[rank::size]
    y_test = y_test[rank::size]

    # Reshape and Normalize data for model input
    x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
    x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255.0
    x_test /= 255.0

    return (x_train, y_train), (x_test, y_test)

Définir un modèle DNN

Une fois notre jeu de données traité, nous pouvons définir notre modèle TensorFlow. Le même code permet également de former un modèle TensorFlow à nœud unique.

# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
    from tensorflow.keras import models
    from tensorflow.keras import layers

    model = models.Sequential()
    model.add(
        layers.Conv2D(32,
                      kernel_size=(3, 3),
                      activation='relu',
                      input_shape=(28, 28, 1)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(layers.Dropout(0.25))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(10, activation='softmax'))
    return model

Définir une fonction d’entraînement pour un seul nœud

Tout d’abord, nous effectuons l’apprentissage de notre modèle TensorFlow sur le nœud du pilote du pool Apache Spark. Une fois le processus de formation terminé, nous évaluons le modèle et imprimons les scores de perte et de précision.


def train(learning_rate=0.1):
    import tensorflow as tf
    from tensorflow import keras

    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

    # Prepare dataset
    (x_train, y_train), (x_test, y_test) = get_dataset()

    # Initialize model
    model = get_model()

    # Specify the optimizer (Adadelta in this example)
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))
    return model

# Run the training process on the driver
model = train(learning_rate=lr_single_node)

# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)

Migrer vers HorovodRunner pour un entraînement distribué

Nous allons ensuite examiner comment le même code peut être réexécuter à l’aide de HorovodRunner pour l’entraînement distribué.

Définir une fonction d’entraînement

Pour effectuer l’apprentissage d’un modèle, nous définissons d’abord une fonction de formation pour HorovodRunner.

# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
    # Import base libs
    import tempfile
    import os
    import shutil
    import atexit

    # Import tensorflow modules to each worker
    import tensorflow as tf
    from tensorflow import keras
    import horovod.tensorflow.keras as hvd

    # Initialize Horovod
    hvd.init()

    # Pin GPU to be used to process local rank (one GPU per process)
    # These steps are skipped on a CPU cluster
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
                                                   'GPU')

    # Call the get_dataset function you created, this time with the Horovod rank and size
    (x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())

    # Initialize model with random weights
    model = get_model()

    # Adjust learning rate based on number of GPUs
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
                                          hvd.size())

    # Use the Horovod Distributed Optimizer
    optimizer = hvd.DistributedOptimizer(optimizer)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    # Create a callback to broadcast the initial variable states from rank 0 to all other processes.
    # This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
    callbacks = [
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    ]

    # Model checkpoint location.
    ckpt_dir = tempfile.mkdtemp()
    ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
    atexit.register(lambda: shutil.rmtree(ckpt_dir))

    # Save checkpoints only on worker 0 to prevent conflicts between workers
    if hvd.rank() == 0:
        callbacks.append(
            keras.callbacks.ModelCheckpoint(ckpt_file,
                                            monitor='val_loss',
                                            mode='min',
                                            save_best_only=True))

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              callbacks=callbacks,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))

    # Return model bytes only on worker 0
    if hvd.rank() == 0:
        with open(ckpt_file, 'rb') as f:
            return f.read()

Effectuer l'entraînement

Une fois le modèle défini, nous pouvons exécuter le processus de formation.

# Run training
import os
import sys
import horovod.spark


best_model_bytes = \
    horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
                    env=os.environ.copy(),
                    stdout=sys.stdout, stderr=sys.stderr, verbose=2,
                    prefix_output_with_timestamp=True)[0]

Enregistrer les points de contrôle dans le stockage ADLS

Ce code montre comment enregistrer les points de contrôle dans le compte Azure Data Lake Storage (ADLS).

import tempfile
import fsspec
import os

local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file

with open(local_ckpt_file, 'wb') as f:
    f.write(best_model_bytes)

## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)

print(adls_ckpt_file)

Évaluer le modèle entraîné Horovod

Une fois la formation du modèle terminée, nous pouvons examiner la perte et la précision pour le modèle final.

import tensorflow as tf

hvd_model = tf.keras.models.load_model(local_ckpt_file)

_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)

Nettoyer les ressources

Pour vous assurer que l’instance Spark est arrêtée, mettez fin aux sessions connectées (notebooks). Le pool s’arrête quand la durée d’inactivité spécifiée dans le pool Apache Spark est atteinte. Vous pouvez également sélectionner Arrêter la session dans la barre d’état en haut à droite du notebook.

Capture d’écran montrant le bouton Arrêter la session dans la barre d’État.

Étapes suivantes