Esercitazione: Training distribuito con Horovod Runner e TensorFlow (anteprima)
Horovod è un framework di training distribuito per librerie come TensorFlow e PyTorch. Con Horovod, gli utenti possono aumentare le prestazioni di uno script di training esistente per l'esecuzione su centinaia di GPU in poche righe di codice.
In Azure Synapse Analytics gli utenti possono iniziare rapidamente a usare Horovod usando il runtime predefinito di Apache Spark 3. Per le applicazioni pipeline spark ML che usano TensorFlow, gli utenti possono usare HorovodRunner
. Questo notebook usa un dataframe Apache Spark per eseguire il training distribuito di un modello di rete neurale distribuito (Distributed Neural Network) nel set di dati MNIST. Questa esercitazione usa TensorFlow e per HorovodRunner
eseguire il processo di training.
Prerequisiti
- Area di lavoro di Azure Synapse Analytics con un account di archiviazione di Azure Data Lake Storage Gen2 configurato come risorsa archiviazione predefinita. È necessario essere il Collaboratore ai dati dei BLOB della risorsa di archiviazione del file system di Data Lake Storage Gen2 con cui si lavora.
- Creare un pool di Apache Spark abilitato per GPU nell'area di lavoro di Azure Synapse Analytics. Per informazioni dettagliate, vedere Creare un pool di Apache Spark abilitato per GPU in Azure Synapse. Per questa esercitazione è consigliabile usare le dimensioni del cluster GPU-Large con 3 nodi.
Avviso
- L'anteprima accelerata della GPU è limitata al runtime apache Spark 3.2 (fine del supporto annunciato). La fine del supporto annunciato per Il runtime di Azure Synapse per Apache Spark 3.2 è stata annunciata l'8 luglio 2023. La fine del supporto annuncia i runtime non includerà correzioni di bug e funzionalità. Le correzioni di sicurezza verranno backportate in base alla valutazione dei rischi. Questo runtime e l'anteprima accelerata della GPU corrispondente in Spark 3.2 verranno ritirati e disabilitati a partire dall'8 luglio 2024.
- L'anteprima accelerata della GPU non è ora supportata nel runtime di Azure Synapse 3.1 (non supportato). Il runtime di Azure Synapse per Apache Spark 3.1 ha raggiunto la fine del supporto a partire dal 26 gennaio 2023, con il supporto ufficiale sospeso a partire dal 26 gennaio 2024 e non sono stati risolti ulteriormente i ticket di supporto, le correzioni di bug o gli aggiornamenti della sicurezza oltre questa data.
Configurare la sessione di Apache Spark
All'inizio della sessione è necessario configurare alcune impostazioni di Apache Spark. Nella maggior parte dei casi, è sufficiente impostare e numExecutors
spark.rapids.memory.gpu.reserve
. Per i modelli molto grandi, gli utenti possono anche dover configurare l'impostazione spark.kryoserializer.buffer.max
. Per i modelli TensorFlow, gli utenti devono impostare su spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH
true.
Nell'esempio è possibile vedere come è possibile passare le configurazioni di Spark con il %%configure
comando . Il significato dettagliato di ogni parametro è illustrato nella documentazione di configurazione di Apache Spark. I valori forniti sono i valori consigliati delle procedure consigliate per i pool gpu di Azure Synapse di grandi dimensioni.
%%configure -f
{
"driverMemory": "30g",
"driverCores": 4,
"executorMemory": "60g",
"executorCores": 12,
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
"spark.kryoserializer.buffer.max": "2000m"
}
}
Per questa esercitazione vengono usate le configurazioni seguenti:
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
}
}
Nota
Quando si esegue il training con Horovod, gli utenti devono impostare la configurazione di Spark per numExecutors
in modo che sia minore o uguale al numero di nodi.
Configurare l'account di archiviazione primario
È necessario l'account azure Data Lake Archiviazione (ADLS) per l'archiviazione dei dati intermedi e del modello. Se si usa un account di archiviazione alternativo, assicurarsi di configurare il servizio collegato per eseguire automaticamente l'autenticazione e la lettura dall'account.
In questo esempio i dati vengono letti dall'account di archiviazione primario di Azure Synapse Analytics. Per leggere i risultati è necessario modificare le proprietà seguenti: remote_url
.
# Specify training parameters
num_proc = 3 # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1 # learning rate for single node code
# configure adls store remote url
remote_url = "<<abfss path to storage account>>
Preparazione del set di dati
Si prepara quindi il set di dati per il training. In questa esercitazione viene usato il set di dati MNIST da Set di dati aperti di Azure.
def get_dataset(rank=0, size=1):
# import dependency libs
from azureml.opendatasets import MNIST
from sklearn.preprocessing import OneHotEncoder
import numpy as np
# Download MNIST dataset from Azure Open Datasets
mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
x = np.array(mnist_df['features'].values.tolist())
y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)
enc = OneHotEncoder()
enc.fit(y)
y = enc.transform(y).toarray()
(x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
y[60000:])
# Prepare dataset for distributed training
x_train = x_train[rank::size]
y_train = y_train[rank::size]
x_test = x_test[rank::size]
y_test = y_test[rank::size]
# Reshape and Normalize data for model input
x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255.0
x_test /= 255.0
return (x_train, y_train), (x_test, y_test)
Definire il modelli DNN
Una volta elaborato il set di dati, è possibile definire il modello TensorFlow. Lo stesso codice può essere usato anche per eseguire il training di un modello TensorFlow a nodo singolo.
# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
from tensorflow.keras import models
from tensorflow.keras import layers
model = models.Sequential()
model.add(
layers.Conv2D(32,
kernel_size=(3, 3),
activation='relu',
input_shape=(28, 28, 1)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Dropout(0.25))
model.add(layers.Flatten())
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(10, activation='softmax'))
return model
Definire una funzione di training per un singolo nodo
Prima di tutto, viene eseguito il training del modello TensorFlow nel nodo driver del pool di Apache Spark. Una volta completato il processo di training, si valuta il modello e si stampano i punteggi di perdita e accuratezza.
def train(learning_rate=0.1):
import tensorflow as tf
from tensorflow import keras
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# Prepare dataset
(x_train, y_train), (x_test, y_test) = get_dataset()
# Initialize model
model = get_model()
# Specify the optimizer (Adadelta in this example)
optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
model.fit(x_train,
y_train,
batch_size=batch_size,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
return model
# Run the training process on the driver
model = train(learning_rate=lr_single_node)
# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)
Eseguire la migrazione a HorovodRunner per il training distribuito
Si esaminerà quindi come potrebbe essere eseguito di nuovo lo stesso codice usando HorovodRunner
per il training distribuito.
Definire la funzione di training
Per eseguire il training di un modello, viene prima definita una funzione di training per HorovodRunner
.
# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
# Import base libs
import tempfile
import os
import shutil
import atexit
# Import tensorflow modules to each worker
import tensorflow as tf
from tensorflow import keras
import horovod.tensorflow.keras as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
# These steps are skipped on a CPU cluster
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
'GPU')
# Call the get_dataset function you created, this time with the Horovod rank and size
(x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())
# Initialize model with random weights
model = get_model()
# Adjust learning rate based on number of GPUs
optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
hvd.size())
# Use the Horovod Distributed Optimizer
optimizer = hvd.DistributedOptimizer(optimizer)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
# Create a callback to broadcast the initial variable states from rank 0 to all other processes.
# This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
callbacks = [
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
# Model checkpoint location.
ckpt_dir = tempfile.mkdtemp()
ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
atexit.register(lambda: shutil.rmtree(ckpt_dir))
# Save checkpoints only on worker 0 to prevent conflicts between workers
if hvd.rank() == 0:
callbacks.append(
keras.callbacks.ModelCheckpoint(ckpt_file,
monitor='val_loss',
mode='min',
save_best_only=True))
model.fit(x_train,
y_train,
batch_size=batch_size,
callbacks=callbacks,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
# Return model bytes only on worker 0
if hvd.rank() == 0:
with open(ckpt_file, 'rb') as f:
return f.read()
Eseguire il training
Dopo aver definito il modello, è possibile eseguire il processo di training.
# Run training
import os
import sys
import horovod.spark
best_model_bytes = \
horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
env=os.environ.copy(),
stdout=sys.stdout, stderr=sys.stderr, verbose=2,
prefix_output_with_timestamp=True)[0]
Salvare i checkpoint nell'archiviazione Azure Data Lake Storage
Il codice illustra come salvare i checkpoint nell'account azure Data Lake Archiviazione (ADLS).
import tempfile
import fsspec
import os
local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file
with open(local_ckpt_file, 'wb') as f:
f.write(best_model_bytes)
## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)
print(adls_ckpt_file)
Valutare il modello con training di Horovod
Una volta completato il training del modello, è possibile esaminare la perdita e l'accuratezza del modello finale.
import tensorflow as tf
hvd_model = tf.keras.models.load_model(local_ckpt_file)
_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)
Pulire le risorse
Per assicurarsi che l'istanza di Spark venga arrestata, terminare tutte le sessioni connesse (notebook). Il pool si arresta quando viene raggiunto il tempo di inattività specificato nel pool di Apache Spark. Si può anche selezionare Termina sessione sulla barra di stato nella parte destra superiore del notebook.
Passaggi successivi
Commenti e suggerimenti
https://aka.ms/ContentUserFeedback.
Presto disponibile: Nel corso del 2024 verranno gradualmente disattivati i problemi di GitHub come meccanismo di feedback per il contenuto e ciò verrà sostituito con un nuovo sistema di feedback. Per altre informazioni, vedereInvia e visualizza il feedback per