Tutorial: verteiltes Training mit Horovod Runner und TensorFlow (veraltet)
Horovod ist ein Framework für verteiltes Training für Bibliotheken wie TensorFlow und PyTorch. Mit Horovod können Benutzer ein bestehendes Trainingsskript mit nur wenigen Zeilen Code so hochskalieren, dass es auf Hunderten von GPUs ausgeführt werden kann.
Innerhalb von Azure Synapse Analytics können Benutzer schnell mit Horovod beginnen, indem sie die standardmäßige Apache Spark 3 Runtime verwenden. Für Spark ML-Pipelineanwendungen, die TensorFlow verwenden, können Sie HorovodRunner
verwenden. Dieses Notebook verwendet einen Apache Spark-Dataframe, um ein verteiltes Training eines verteilten neuronalen Netzes (DNN) mit einem MNIST-Dataset durchzuführen. Dieses Tutorial nutzt TensorFlow und HorovodRunner
, um den Trainingsprozess auszuführen.
Voraussetzungen
- Azure Synapse Analytics-Arbeitsbereich mit einem als Standardspeicher konfigurierten Azure Data Lake Storage Gen2-Speicherkonto. Für das hier verwendete Data Lake Storage Gen2-Dateisystem müssen Sie über die Rolle Mitwirkender an Storage-Blobdaten verfügen.
- Erstellen Sie einen GPU-fähigen Apache Spark-Pool in Ihrem Azure Synapse Analytics-Arbeitsbereich. Ausführliche Informationen finden Sie unter Erstellen eines GPU-fähigen Apache Spark-Pools in Azure Synapse. Für dieses Tutorial empfehlen wir die Verwendung der Clustergröße „GPU: Groß“ mit drei Knoten.
Hinweis
Die Vorschau für GPU-fähige Azure Synapse-Pools ist jetzt veraltet.
Achtung
Benachrichtigung zur Einstellung und Deaktivierung für GPUs unter der Azure Synapse-Runtime für Apache Spark 3.1 und 3.2
- Die Vorschau mit GPU-Beschleunigung ist jetzt für die Apache Spark 3.2-Runtime (veraltet) veraltet. Für veraltete Runtimes werden keine Fehler- und Featurebehebungen mehr bereitgestellt. Diese Runtime und die entsprechende Vorschau mit GPU-Beschleunigung in Spark 3.2 wurden am 8. Juli 2024 eingestellt und deaktiviert.
- Die Vorschau mit GPU-Beschleunigung ist jetzt für die Azure Synapse 3.1-Runtime (veraltet) veraltet. Azure Synapse Runtime für Apache Spark 3.1 hat am 26. Januar 2023 das Supportende erreicht. Der offizielle Support wurde am 26. Januar 2024 eingestellt, und Supporttickets, Fehlerbehebungen oder Sicherheitsupdates nach diesem Datum werden nicht mehr bearbeitet.
Konfigurieren der Apache Spark-Sitzung
Zu Beginn der Sitzung müssen wir ein paar Einstellungen für Apache Spark konfigurieren. In den meisten Fällen müssen wir nur numExecutors
und spark.rapids.memory.gpu.reserve
festlegen. Für sehr große Modelle müssen Benutzer möglicherweise auch die Einstellung spark.kryoserializer.buffer.max
konfigurieren. Für TensorFlow-Modelle müssen die Benutzer spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH
auf „true“ festlegen.
In diesem Beispiel können Sie sehen, wie die Spark-Konfigurationen mit dem Befehl %%configure
übergeben werden können. Die genaue Bedeutung der einzelnen Parameter wird in der Dokumentation zur Apache Spark-Konfiguration erläutert. Die angegebenen Werte sind die vorgeschlagenen, bewährten Methoden für Azure Synapse-Pools vom Typ „GPU: Groß“.
%%configure -f
{
"driverMemory": "30g",
"driverCores": 4,
"executorMemory": "60g",
"executorCores": 12,
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
"spark.kryoserializer.buffer.max": "2000m"
}
}
Für dieses Tutorial verwenden wir die folgenden Konfigurationen:
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
}
}
Hinweis
Wenn Sie mit Horovod trainieren, sollten Benutzer die Spark-Konfiguration für numExecutors
so festlegen, dass sie kleiner oder gleich der Anzahl der Knoten ist.
Einrichten des primären Speicherkontos
Für die Speicherung von Zwischen- und Modelldaten benötigen wir das Azure Data Lake Storage-Konto (ADLS). Wenn Sie ein alternatives Speicherkonto verwenden, müssen Sie den verknüpften Dienst so einrichten, dass er sich automatisch authentifiziert und von diesem Konto liest.
In diesem Beispiel lesen wir Daten aus dem primären Azure Synapse Analytics-Speicherkonto. Um das Ergebnis zu lesen, müssen Sie die folgenden Eigenschaften ändern: remote_url
.
# Specify training parameters
num_proc = 3 # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1 # learning rate for single node code
# configure adls store remote url
remote_url = "<<abfss path to storage account>>
Vorbereiten des Datasets
Als nächstes bereiten wir das Dataset für das Training vor. In diesem Tutorial verwenden wir das MNIST-Dataset von Azure Open Datasets.
def get_dataset(rank=0, size=1):
# import dependency libs
from azureml.opendatasets import MNIST
from sklearn.preprocessing import OneHotEncoder
import numpy as np
# Download MNIST dataset from Azure Open Datasets
mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
x = np.array(mnist_df['features'].values.tolist())
y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)
enc = OneHotEncoder()
enc.fit(y)
y = enc.transform(y).toarray()
(x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
y[60000:])
# Prepare dataset for distributed training
x_train = x_train[rank::size]
y_train = y_train[rank::size]
x_test = x_test[rank::size]
y_test = y_test[rank::size]
# Reshape and Normalize data for model input
x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255.0
x_test /= 255.0
return (x_train, y_train), (x_test, y_test)
Definieren von DNN-Modellen
Sobald unser Dataset verarbeitet wurde, können wir unser TensorFlow-Modell definieren. Derselbe Code könnte auch verwendet werden, um ein TensorFlow-Modell mit einem einzelnen Knoten zu trainieren.
# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
from tensorflow.keras import models
from tensorflow.keras import layers
model = models.Sequential()
model.add(
layers.Conv2D(32,
kernel_size=(3, 3),
activation='relu',
input_shape=(28, 28, 1)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Dropout(0.25))
model.add(layers.Flatten())
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(10, activation='softmax'))
return model
Definieren einer Trainingsfunktion für einen einzelnen Knoten
Zunächst trainieren wir unser TensorFlow-Modell auf dem Treiberknoten des Apache Spark-Pools. Sobald das Training abgeschlossen ist, werten wir das Modell aus und geben den Score für Verlust und Genauigkeit aus.
def train(learning_rate=0.1):
import tensorflow as tf
from tensorflow import keras
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# Prepare dataset
(x_train, y_train), (x_test, y_test) = get_dataset()
# Initialize model
model = get_model()
# Specify the optimizer (Adadelta in this example)
optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
model.fit(x_train,
y_train,
batch_size=batch_size,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
return model
# Run the training process on the driver
model = train(learning_rate=lr_single_node)
# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)
Migrieren zu HorovodRunner für verteiltes Training
Als nächstes sehen wir uns an, wie derselbe Code mithilfe von HorovodRunner
für verteiltes Training ausgeführt werden kann.
Definieren der Trainingsfunktion
Um ein Modell zu trainieren, definieren wir zunächst eine Trainingsfunktion für HorovodRunner
.
# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
# Import base libs
import tempfile
import os
import shutil
import atexit
# Import tensorflow modules to each worker
import tensorflow as tf
from tensorflow import keras
import horovod.tensorflow.keras as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
# These steps are skipped on a CPU cluster
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
'GPU')
# Call the get_dataset function you created, this time with the Horovod rank and size
(x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())
# Initialize model with random weights
model = get_model()
# Adjust learning rate based on number of GPUs
optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
hvd.size())
# Use the Horovod Distributed Optimizer
optimizer = hvd.DistributedOptimizer(optimizer)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
# Create a callback to broadcast the initial variable states from rank 0 to all other processes.
# This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
callbacks = [
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
# Model checkpoint location.
ckpt_dir = tempfile.mkdtemp()
ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
atexit.register(lambda: shutil.rmtree(ckpt_dir))
# Save checkpoints only on worker 0 to prevent conflicts between workers
if hvd.rank() == 0:
callbacks.append(
keras.callbacks.ModelCheckpoint(ckpt_file,
monitor='val_loss',
mode='min',
save_best_only=True))
model.fit(x_train,
y_train,
batch_size=batch_size,
callbacks=callbacks,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
# Return model bytes only on worker 0
if hvd.rank() == 0:
with open(ckpt_file, 'rb') as f:
return f.read()
Ausführen des Trainings
Sobald das Modell definiert ist, können wir das Training ausführen.
# Run training
import os
import sys
import horovod.spark
best_model_bytes = \
horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
env=os.environ.copy(),
stdout=sys.stdout, stderr=sys.stderr, verbose=2,
prefix_output_with_timestamp=True)[0]
Speichern von Prüfpunkten in ADLS-Speicher
Der Code zeigt, wie Sie die Prüfpunkte im ADLS-Konto (Azure Data Lake Storage) speichern.
import tempfile
import fsspec
import os
local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file
with open(local_ckpt_file, 'wb') as f:
f.write(best_model_bytes)
## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)
print(adls_ckpt_file)
Auswerten des mit Horovod trainierten Modells
Nachdem das Training des Modells abgeschlossen ist, können wir einen Blick auf den Verlust und die Genauigkeit des endgültigen Modells werfen.
import tensorflow as tf
hvd_model = tf.keras.models.load_model(local_ckpt_file)
_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)
Bereinigen von Ressourcen
Beenden Sie alle verbundenen Sitzungen (Notebooks), um sicherzustellen, dass die Spark-Instanz heruntergefahren wird. Der Pool wird heruntergefahren, wenn die im Apache Spark-Pool angegebene Leerlaufzeit erreicht wird. Sie können auch auf der Statusleiste am oberen Rand des Notebooks die Option Sitzung beenden auswählen.