Tutoriel : Formation distribuée avec Horovod Runner et TensorFlow (obsolète)
Horovod est une infrastructure d’entraînement distribué pour les bibliothèques TensorFlow et PyTorch. Avec Horovod, les utilisateurs peuvent mettre à l’échelle un script d’entraînement existant pour qu’il s’exécute sur des centaines de GPU en quelques lignes de code.
Dans Azure Synapse Analytics, les utilisateurs peuvent rapidement commencer à utiliser Horovod à l’aide du runtime Apache Spark 3 par défaut. Pour les applications de pipeline Spark ML qui utilisent TensorFlow, les utilisateurs peuvent utiliser HorovodRunner
. Ce notebook utilise une trame de données Apache Spark pour effectuer l’entraînement distribué d’un modèle de réseau neuronal distribué (DNN) sur le jeu de données MNIST. Ce tutoriel utilise TensorFlow et HorovodRunner
pour exécuter le processus de formation.
Prérequis
- Espace de travail Azure Synapse Analytics avec un compte de stockage Azure Data Lake Storage Gen2 configuré comme stockage par défaut. Vous devez être le contributeur aux données Blob de stockage du système de fichiers Data Lake Storage Gen2 que vous utilisez.
- Créez un pool Apache Spark avec processeur graphique (GPU) dans votre espace de travail Azure Synapse Analytics. Pour plus d’informations, consultez Créer un pool Apache Spark avec GPU dans Azure Synapse. Pour ce didacticiel, nous vous suggérons d’utiliser la taille du cluster GPU-Large avec 3 nœuds.
Remarque
La préversion pour les pools Azure Synapse avec GPU est désormais déconseillée.
Attention
Notification de dépréciation et de désactivation des GPU sur le Runtime Azure Synapse pour Apache Spark 3.1 et 3.2
- La préversion accélérée du GPU est désormais déconseillée sur le runtime Apache Spark 3.2 (déconseillé). Les runtimes déconseillés ne vont pas bénéficier de corrections de bogues et de fonctionnalités. Ce runtime, ainsi que la préversion correspondante accélérée par le GPU sur Spark 3.2 ont été retirés et désactivés depuis le 8 juillet 2024.
- La préversion accélérée par le GPU est désormais déconseillée sur runtime Azure Synapse 3.1 (déconseillé). Azure Synapse Runtime pour Apache Spark 3.1 a atteint sa fin de support le 26 janvier 2023, le support officiel étant interrompu à compter du 26 janvier 2024 et aucune autre réponse aux tickets de support, aux corrections de bogues ou aux mises à jour de sécurité au-delà de cette date.
Configurer la session Apache Spark
Au début de la session, nous devons configurer quelques paramètres Apache Spark. Dans la plupart des cas, nous devons uniquement définir les numExecutors
et spark.rapids.memory.gpu.reserve
. Pour les modèles très volumineux, les utilisateurs peuvent également avoir besoin de configurer le paramètre spark.kryoserializer.buffer.max
. Pour les modèles TensorFlow, les utilisateurs doivent définir spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH
sur true.
Dans cet exemple, vous pouvez voir comment les configurations Spark peuvent être passées avec la commande %%configure
. La signification détaillée de chaque paramètre est expliquée dans la documentation de configuration Apache Spark. Les valeurs fournies sont les valeurs suggérées et recommandées pour les pools volumineux Azure Synapse avec GPU.
%%configure -f
{
"driverMemory": "30g",
"driverCores": 4,
"executorMemory": "60g",
"executorCores": 12,
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
"spark.kryoserializer.buffer.max": "2000m"
}
}
Pour ce tutoriel, nous utilisons les configurations suivantes :
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
}
}
Remarque
Lors de l’entraînement avec Horovod, les utilisateurs doivent définir la configuration Spark pour numExecutors
afin qu’elle soit inférieure ou égale au nombre de nœuds.
Configurer le compte de stockage principal
Nous avons besoin du compte Azure Data Lake Storage (ADLS) pour stocker les données intermédiaires et de modèle. Si vous utilisez un autre compte de stockage, veillez à configurer le service lié pour authentifier et lire automatiquement à partir du compte.
Dans cet exemple, nous lisons les données à partir du compte de stockage principal Azure Synapse Analytics. Pour lire les résultats, vous devez modifier les propriétés suivantes : remote_url
.
# Specify training parameters
num_proc = 3 # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1 # learning rate for single node code
# configure adls store remote url
remote_url = "<<abfss path to storage account>>
Préparer le jeu de données
Ensuite, nous préparons le jeu de données pour la formation. Dans ce tutoriel, nous utilisons le jeu de données MNIST à partir d’Azure Open Datasets.
def get_dataset(rank=0, size=1):
# import dependency libs
from azureml.opendatasets import MNIST
from sklearn.preprocessing import OneHotEncoder
import numpy as np
# Download MNIST dataset from Azure Open Datasets
mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
x = np.array(mnist_df['features'].values.tolist())
y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)
enc = OneHotEncoder()
enc.fit(y)
y = enc.transform(y).toarray()
(x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
y[60000:])
# Prepare dataset for distributed training
x_train = x_train[rank::size]
y_train = y_train[rank::size]
x_test = x_test[rank::size]
y_test = y_test[rank::size]
# Reshape and Normalize data for model input
x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255.0
x_test /= 255.0
return (x_train, y_train), (x_test, y_test)
Définir un modèle DNN
Une fois notre jeu de données traité, nous pouvons définir notre modèle TensorFlow. Le même code permet également de former un modèle TensorFlow à nœud unique.
# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
from tensorflow.keras import models
from tensorflow.keras import layers
model = models.Sequential()
model.add(
layers.Conv2D(32,
kernel_size=(3, 3),
activation='relu',
input_shape=(28, 28, 1)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Dropout(0.25))
model.add(layers.Flatten())
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(10, activation='softmax'))
return model
Définir une fonction d’entraînement pour un seul nœud
Tout d’abord, nous effectuons l’apprentissage de notre modèle TensorFlow sur le nœud du pilote du pool Apache Spark. Une fois le processus de formation terminé, nous évaluons le modèle et imprimons les scores de perte et de précision.
def train(learning_rate=0.1):
import tensorflow as tf
from tensorflow import keras
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# Prepare dataset
(x_train, y_train), (x_test, y_test) = get_dataset()
# Initialize model
model = get_model()
# Specify the optimizer (Adadelta in this example)
optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
model.fit(x_train,
y_train,
batch_size=batch_size,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
return model
# Run the training process on the driver
model = train(learning_rate=lr_single_node)
# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)
Migrer vers HorovodRunner pour un entraînement distribué
Nous allons ensuite examiner comment le même code peut être réexécuter à l’aide de HorovodRunner
pour l’entraînement distribué.
Définir une fonction d’entraînement
Pour effectuer l’apprentissage d’un modèle, nous définissons d’abord une fonction de formation pour HorovodRunner
.
# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
# Import base libs
import tempfile
import os
import shutil
import atexit
# Import tensorflow modules to each worker
import tensorflow as tf
from tensorflow import keras
import horovod.tensorflow.keras as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
# These steps are skipped on a CPU cluster
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
'GPU')
# Call the get_dataset function you created, this time with the Horovod rank and size
(x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())
# Initialize model with random weights
model = get_model()
# Adjust learning rate based on number of GPUs
optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
hvd.size())
# Use the Horovod Distributed Optimizer
optimizer = hvd.DistributedOptimizer(optimizer)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
# Create a callback to broadcast the initial variable states from rank 0 to all other processes.
# This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
callbacks = [
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
# Model checkpoint location.
ckpt_dir = tempfile.mkdtemp()
ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
atexit.register(lambda: shutil.rmtree(ckpt_dir))
# Save checkpoints only on worker 0 to prevent conflicts between workers
if hvd.rank() == 0:
callbacks.append(
keras.callbacks.ModelCheckpoint(ckpt_file,
monitor='val_loss',
mode='min',
save_best_only=True))
model.fit(x_train,
y_train,
batch_size=batch_size,
callbacks=callbacks,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
# Return model bytes only on worker 0
if hvd.rank() == 0:
with open(ckpt_file, 'rb') as f:
return f.read()
Effectuer l'entraînement
Une fois le modèle défini, nous pouvons exécuter le processus de formation.
# Run training
import os
import sys
import horovod.spark
best_model_bytes = \
horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
env=os.environ.copy(),
stdout=sys.stdout, stderr=sys.stderr, verbose=2,
prefix_output_with_timestamp=True)[0]
Enregistrer les points de contrôle dans le stockage ADLS
Ce code montre comment enregistrer les points de contrôle dans le compte Azure Data Lake Storage (ADLS).
import tempfile
import fsspec
import os
local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file
with open(local_ckpt_file, 'wb') as f:
f.write(best_model_bytes)
## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)
print(adls_ckpt_file)
Évaluer le modèle entraîné Horovod
Une fois la formation du modèle terminée, nous pouvons examiner la perte et la précision pour le modèle final.
import tensorflow as tf
hvd_model = tf.keras.models.load_model(local_ckpt_file)
_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)
Nettoyer les ressources
Pour vous assurer que l’instance Spark est arrêtée, mettez fin aux sessions connectées (notebooks). Le pool s’arrête quand la durée d’inactivité spécifiée dans le pool Apache Spark est atteinte. Vous pouvez également sélectionner Arrêter la session dans la barre d’état en haut à droite du notebook.